diff --git a/CHANGELOG.md b/CHANGELOG.md index 97b59e2..39f8dcb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,82 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [1.2.1] - unreleased +## [2.0.0] - unreleased + +### Security advisory + +- **Be cautious when piping raw `_msg` into `email_body_html`.** The example + config switched `body: "{{ _msg }}"` in v1.2.0 (#26 fix), and operators may + reasonably mirror that in `email_body_html`. The email notifier marks `body` + as `safe` (pre-escaped HTML) before injection into the email envelope, so a + log line containing raw HTML or `"}); - let result = engine.render("email_alert", &fields, "test_rule").unwrap(); + let result = engine + .render("email_alert", &fields, "test_rule", "vlprod") + .unwrap(); let email_body_html = result.email_body_html.unwrap(); // HTML should be escaped @@ -756,7 +795,9 @@ mod tests { let engine = TemplateEngine::new(templates); let fields = json!({"nginx.http.request_id": "abc"}); - let result = engine.render("alert", &fields, "test_rule").unwrap(); + let result = engine + .render("alert", &fields, "test_rule", "vlprod") + .unwrap(); assert_eq!(result.title, "abc"); assert_eq!(result.body, "id=abc"); } @@ -784,7 +825,9 @@ mod tests { "nginx.http.status_code": "400" }); - let result = engine.render("my_template", &fields, "test_rule").unwrap(); + let result = engine + .render("my_template", &fields, "test_rule", "vlprod") + .unwrap(); assert_eq!(result.title, "T"); assert_eq!(result.body, "B"); let email_body_html = result.email_body_html.unwrap(); @@ -814,7 +857,7 @@ mod tests { let engine = TemplateEngine::new(templates); let fields = json!({"host": "server-01"}); - let result = engine.render("alert", &fields, "VM_OFF").unwrap(); + let result = engine.render("alert", &fields, "VM_OFF", "vlprod").unwrap(); assert_eq!(result.title, "Alert VM_OFF"); } @@ -832,7 +875,7 @@ mod tests { let engine = TemplateEngine::new(templates); let fields = json!({"host": "server-01"}); - let result = engine.render("alert", &fields, "VM_OFF").unwrap(); + let result = engine.render("alert", &fields, "VM_OFF", "vlprod").unwrap(); assert_eq!( result.body, "rule=VM_OFF\nhost=server-01\nrule_again=VM_OFF" @@ -854,13 +897,93 @@ mod tests { let engine = TemplateEngine::new(templates); let fields = json!({"host": "server-01"}); - let result = engine.render("email_alert", &fields, "VM_OFF").unwrap(); + let result = engine + .render("email_alert", &fields, "VM_OFF", "vlprod") + .unwrap(); assert_eq!( result.email_body_html.unwrap(), "

Rule: VM_OFF on server-01

" ); } + // =================================================================== + // v2.0.0: vl_source available in layer 1 templates (title, body, + // email_body_html), matching the layer 2 notifier-level contexts and + // the throttle key render context. + // =================================================================== + + #[test] + fn render_injects_vl_source_in_title() { + let mut templates = HashMap::new(); + templates.insert( + "alert".to_string(), + make_template("[{{ vl_source }}] {{ rule_name }}", "body"), + ); + + let engine = TemplateEngine::new(templates); + let fields = json!({"host": "server-01"}); + + let result = engine.render("alert", &fields, "VM_OFF", "vlprod").unwrap(); + assert_eq!(result.title, "[vlprod] VM_OFF"); + } + + #[test] + fn render_injects_vl_source_in_body() { + let mut templates = HashMap::new(); + templates.insert( + "alert".to_string(), + make_template("title", "source={{ vl_source }}\nhost={{ host }}"), + ); + + let engine = TemplateEngine::new(templates); + let fields = json!({"host": "server-01"}); + + let result = engine.render("alert", &fields, "VM_OFF", "vldev").unwrap(); + assert_eq!(result.body, "source=vldev\nhost=server-01"); + } + + #[test] + fn render_injects_vl_source_in_email_body_html() { + let mut templates = HashMap::new(); + templates.insert( + "email_alert".to_string(), + make_template_with_email_body_html( + "t", + "b", + "

Source: {{ vl_source }}, host: {{ host }}

", + ), + ); + + let engine = TemplateEngine::new(templates); + let fields = json!({"host": "server-01"}); + + let result = engine + .render("email_alert", &fields, "VM_OFF", "vlprod") + .unwrap(); + assert_eq!( + result.email_body_html.unwrap(), + "

Source: vlprod, host: server-01

" + ); + } + + #[test] + fn render_vl_source_synthetic_overrides_event_field() { + // Collision policy: synthetic vl_source wins over any event field + // literally named "vl_source" (matches rule_name collision policy). + let mut templates = HashMap::new(); + templates.insert( + "alert".to_string(), + make_template("{{ vl_source }}", "{{ vl_source }}"), + ); + + let engine = TemplateEngine::new(templates); + let fields = json!({"vl_source": "evil", "host": "server-01"}); + + let result = engine.render("alert", &fields, "VM_OFF", "vlprod").unwrap(); + assert_eq!(result.title, "vlprod"); + assert_eq!(result.body, "vlprod"); + } + #[test] fn render_rule_name_synthetic_overrides_event_field() { // Collision policy: synthetic rule_name wins over any event field @@ -874,7 +997,7 @@ mod tests { let engine = TemplateEngine::new(templates); let fields = json!({"rule_name": "event-value", "host": "server-01"}); - let result = engine.render("alert", &fields, "VM_OFF").unwrap(); + let result = engine.render("alert", &fields, "VM_OFF", "vlprod").unwrap(); assert_eq!(result.title, "VM_OFF"); assert_eq!(result.body, "VM_OFF"); } @@ -891,7 +1014,7 @@ mod tests { let fields = json!({"host": "server-01"}); let result = engine - .render("mattermost_alert", &fields, "test_rule") + .render("mattermost_alert", &fields, "test_rule", "vlprod") .unwrap(); assert!( diff --git a/src/throttle.rs b/src/throttle.rs index c9260e5..21bbd05 100644 --- a/src/throttle.rs +++ b/src/throttle.rs @@ -65,6 +65,9 @@ pub struct Throttler { max_count: u32, /// Rule name for logging and metrics (Arc to avoid cloning). rule_name: Arc, + /// VL source name bound to this throttler (per-task). Threaded into every + /// rendered key so the `(rule, source)` bucket is isolated by default. + vl_source: Arc, /// Pre-created Jinja environment for template rendering (H1 fix). jinja_env: Environment<'static>, } @@ -76,14 +79,11 @@ impl Throttler { /// /// * `config` - Optional throttle configuration. If None, creates a pass-through throttler. /// * `rule_name` - Name of the rule for logging and metrics. - /// - /// # Example - /// - /// ```ignore - /// let throttler = Throttler::new(Some(&compiled_throttle), "my_rule"); - /// ``` - pub fn new(config: Option<&CompiledThrottle>, rule_name: &str) -> Self { - Self::with_capacity(config, rule_name, DEFAULT_MAX_CAPACITY) + /// * `vl_source` - VL source name bound to the task (injected into render + /// context so `{{ vl_source }}` works in `throttle.key` and the default + /// key is per-source by construction). + pub fn new(config: Option<&CompiledThrottle>, rule_name: &str, vl_source: &str) -> Self { + Self::with_capacity(config, rule_name, vl_source, DEFAULT_MAX_CAPACITY) } /// Create a new Throttler with custom max capacity (for testing). @@ -92,10 +92,12 @@ impl Throttler { /// /// * `config` - Optional throttle configuration. /// * `rule_name` - Name of the rule for logging and metrics. + /// * `vl_source` - VL source name bound to this task's throttler. /// * `max_capacity` - Maximum number of keys in the cache (FR25). pub fn with_capacity( config: Option<&CompiledThrottle>, rule_name: &str, + vl_source: &str, max_capacity: u64, ) -> Self { let (key_template, max_count, window) = match config { @@ -108,12 +110,14 @@ impl Throttler { if t.count == 0 { tracing::warn!( rule_name = %rule_name, + vl_source = %vl_source, "Throttle count is 0, all alerts after first will be throttled" ); } if t.window.is_zero() { tracing::warn!( rule_name = %rule_name, + vl_source = %vl_source, "Throttle window is 0, entries will expire immediately" ); } @@ -133,6 +137,7 @@ impl Throttler { key_template, max_count, rule_name: Arc::from(rule_name), + vl_source: Arc::from(vl_source), jinja_env, } } @@ -161,14 +166,18 @@ impl Throttler { "Throttle count updated" ); - // L1: Pre-convert rule_name to String once for metrics (required for 'static) + // L1: Pre-convert rule_name and vl_source to String once for metrics + // (required for 'static label storage). let rule_name_str = self.rule_name.to_string(); + let vl_source_str = self.vl_source.to_string(); if count <= self.max_count { - // M3: Increment metric for passed alerts + // M3: Increment metric for passed alerts (gains `vl_source` for + // multi-source observability — v2.0.0 part 2). metrics::counter!( "valerter_alerts_passed_total", - "rule_name" => rule_name_str + "rule_name" => rule_name_str, + "vl_source" => vl_source_str, ) .increment(1); @@ -177,16 +186,19 @@ impl Throttler { // Log at DEBUG level (throttling is normal behavior) tracing::debug!( rule_name = %self.rule_name, + vl_source = %self.vl_source, throttle_key = %key, count = count, max_count = self.max_count, "Alert throttled" ); - // Increment metric (FR23, FR24) + // Increment metric (FR23, FR24); gains `vl_source` to disambiguate + // throttle hot-spots per source. metrics::counter!( "valerter_alerts_throttled_total", - "rule_name" => rule_name_str + "rule_name" => rule_name_str, + "vl_source" => vl_source_str, ) .increment(1); @@ -196,26 +208,27 @@ impl Throttler { /// Render the throttle key from template and fields. /// - /// If no template is configured, returns a global key for the rule. - /// If template rendering fails, logs a warning and returns a fallback key. + /// If no template is configured, returns the per-source default key + /// `"{rule}-{source}:global"` so multi-source deployments see isolated + /// throttle buckets without any config. If rendering fails, logs a + /// warning and returns a fallback key. fn render_key(&self, fields: &Value) -> String { match &self.key_template { Some(template) => { - // Inject synthetic `rule_name` (issue #31) so users can write - // `{{ rule_name }}` in throttle.key. Synthetic wins over any - // event field with the same name, matching layer 1/2 template + // Inject synthetic `rule_name` (issue #31) and `vl_source` + // (multi-source v2.0.0). Synthetic values win over any event + // field with the same name, matching layer 1/2 template // behavior. - let enriched_ctx = enrich_with_rule_name(fields, &self.rule_name); - // H1 fix: Use pre-created jinja_env instead of creating new one + let enriched_ctx = enrich_with_context(fields, &self.rule_name, &self.vl_source); match self.jinja_env.render_str(template, &enriched_ctx) { Ok(key) => { tracing::trace!(rendered_key = %key, "Throttle key rendered"); key } Err(e) => { - // Template error - log and use fallback tracing::warn!( rule_name = %self.rule_name, + vl_source = %self.vl_source, template = %template, error = %e, "Failed to render throttle key, using fallback" @@ -225,8 +238,11 @@ impl Throttler { } } None => { - // No template = global throttle for the rule - format!("{}:global", self.rule_name) + // Default key is per-(rule, source) so buckets are isolated + // per-source by construction. Equivalent to rendering + // `"{{ rule_name }}-{{ vl_source }}:global"` via Jinja, but + // inlined to avoid the render round-trip on every call. + format!("{}-{}:global", self.rule_name, self.vl_source) } } } @@ -242,20 +258,25 @@ impl Throttler { } /// Unflatten dotted event keys (issue #25) then inject the synthetic -/// `rule_name` key (issue #31). Matches the layer 1 template rendering path -/// so users can reference both dotted event fields (`{{ nginx.http.status }}`) -/// and `{{ rule_name }}` inside a `throttle.key` template consistently. +/// `rule_name` (issue #31) and `vl_source` (v2.0.0 multi-source) keys. +/// Matches the layer 1 template rendering path so users can reference both +/// dotted event fields (`{{ nginx.http.status }}`) and the synthetic keys +/// inside a `throttle.key` template consistently. /// /// Returns the original value unchanged if it is not a JSON object (should -/// not happen in practice, VL events are always objects). The synthetic -/// value wins over any event field literally named `rule_name`. -fn enrich_with_rule_name(fields: &Value, rule_name: &str) -> Value { +/// not happen in practice, VL events are always objects). Synthetic values +/// win over any event field literally named `rule_name` or `vl_source`. +fn enrich_with_context(fields: &Value, rule_name: &str, vl_source: &str) -> Value { let mut ctx = crate::parser::unflatten_dotted_keys(fields); if let Some(obj) = ctx.as_object_mut() { obj.insert( "rule_name".to_string(), Value::String(rule_name.to_string()), ); + obj.insert( + "vl_source".to_string(), + Value::String(vl_source.to_string()), + ); } ctx } @@ -291,7 +312,7 @@ mod tests { #[test] fn render_key_with_simple_template() { let config = make_config(Some("{{ host }}"), 3, 60); - let throttler = Throttler::new(Some(&config), "test_rule"); + let throttler = Throttler::new(Some(&config), "test_rule", "vlprod"); let fields = json!({"host": "SW-01", "port": "Gi0/1"}); let key = throttler.render_key(&fields); @@ -306,7 +327,7 @@ mod tests { #[test] fn render_key_with_composite_template() { let config = make_config(Some("{{ host }}-{{ port }}"), 3, 60); - let throttler = Throttler::new(Some(&config), "test_rule"); + let throttler = Throttler::new(Some(&config), "test_rule", "vlprod"); let fields = json!({"host": "SW-01", "port": "Gi0/1"}); let key = throttler.render_key(&fields); @@ -321,7 +342,7 @@ mod tests { #[test] fn render_key_with_missing_field_returns_empty_value() { let config = make_config(Some("{{ host }}-{{ missing }}"), 3, 60); - let throttler = Throttler::new(Some(&config), "test_rule"); + let throttler = Throttler::new(Some(&config), "test_rule", "vlprod"); let fields = json!({"host": "SW-01"}); let key = throttler.render_key(&fields); @@ -337,7 +358,7 @@ mod tests { #[test] fn first_alert_passes() { let config = make_config(Some("{{ host }}"), 3, 60); - let throttler = Throttler::new(Some(&config), "test_rule"); + let throttler = Throttler::new(Some(&config), "test_rule", "vlprod"); let fields = json!({"host": "SW-01"}); let result = throttler.check(&fields); @@ -352,7 +373,7 @@ mod tests { #[test] fn alerts_up_to_count_pass() { let config = make_config(Some("{{ host }}"), 3, 60); - let throttler = Throttler::new(Some(&config), "test_rule"); + let throttler = Throttler::new(Some(&config), "test_rule", "vlprod"); let fields = json!({"host": "SW-01"}); @@ -369,7 +390,7 @@ mod tests { #[test] fn alert_after_count_is_throttled() { let config = make_config(Some("{{ host }}"), 3, 60); - let throttler = Throttler::new(Some(&config), "test_rule"); + let throttler = Throttler::new(Some(&config), "test_rule", "vlprod"); let fields = json!({"host": "SW-01"}); @@ -395,7 +416,7 @@ mod tests { count: 2, window: Duration::from_millis(100), // Very short for testing }; - let throttler = Throttler::new(Some(&config), "test_rule"); + let throttler = Throttler::new(Some(&config), "test_rule", "vlprod"); let fields = json!({"host": "SW-01"}); @@ -424,7 +445,7 @@ mod tests { let config = make_config(Some("{{ key }}"), 2, 3600); // Use with_capacity to set a small max (5 keys) - let throttler = Throttler::with_capacity(Some(&config), "test_rule", 5); + let throttler = Throttler::with_capacity(Some(&config), "test_rule", "vlprod", 5); // Fill cache with 5 different keys, each gets 2 alerts (at max) for i in 0..5 { @@ -469,27 +490,44 @@ mod tests { #[test] fn no_key_template_uses_global_key() { let config = make_config(None, 2, 60); - let throttler = Throttler::new(Some(&config), "my_rule"); + let throttler = Throttler::new(Some(&config), "my_rule", "vlprod"); let fields1 = json!({"host": "SW-01"}); let fields2 = json!({"host": "SW-02"}); - // Both should use the same global key "my_rule:global" + // Both should use the same per-source default key + // "my_rule-vlprod:global" - cross-host but not cross-source. assert_eq!(throttler.check(&fields1), ThrottleResult::Pass); assert_eq!(throttler.check(&fields2), ThrottleResult::Pass); - // Third from either should be throttled (same global key) + // Third from either should be throttled (same default key) assert_eq!(throttler.check(&fields1), ThrottleResult::Throttled); } #[test] - fn global_key_format() { + fn default_key_format_is_rule_dash_source_global() { + // Spec: default throttle key is `{rule}-{source}:global`. + // Used to be `{rule}:global` (pre-v2.0.0); breaking change for + // multi-source deployments and locks per-source bucket isolation. let config = make_config(None, 2, 60); - let throttler = Throttler::new(Some(&config), "my_rule"); + let throttler = Throttler::new(Some(&config), "my_rule", "vlprod"); let fields = json!({}); let key = throttler.render_key(&fields); - assert_eq!(key, "my_rule:global"); + assert_eq!(key, "my_rule-vlprod:global"); + } + + #[test] + fn default_key_isolates_buckets_per_source() { + // Two throttlers with the same rule but different sources must + // produce different default keys, so buckets are isolated per-source. + let config = make_config(None, 2, 60); + let throttler_a = Throttler::new(Some(&config), "VM_OFF", "vlprod"); + let throttler_b = Throttler::new(Some(&config), "VM_OFF", "vldev"); + + let fields = json!({}); + assert_eq!(throttler_a.render_key(&fields), "VM_OFF-vlprod:global"); + assert_eq!(throttler_b.render_key(&fields), "VM_OFF-vldev:global"); } // =================================================================== @@ -499,7 +537,7 @@ mod tests { #[test] fn render_key_with_nested_fields() { let config = make_config(Some("{{ data.server.name }}"), 3, 60); - let throttler = Throttler::new(Some(&config), "test_rule"); + let throttler = Throttler::new(Some(&config), "test_rule", "vlprod"); let fields = json!({ "data": { @@ -520,7 +558,7 @@ mod tests { #[test] fn different_keys_are_throttled_independently() { let config = make_config(Some("{{ host }}"), 2, 60); - let throttler = Throttler::new(Some(&config), "test_rule"); + let throttler = Throttler::new(Some(&config), "test_rule", "vlprod"); let sw01 = json!({"host": "SW-01"}); let sw02 = json!({"host": "SW-02"}); @@ -538,7 +576,7 @@ mod tests { #[test] fn no_config_passes_all() { - let throttler = Throttler::new(None, "test_rule"); + let throttler = Throttler::new(None, "test_rule", "vlprod"); let fields = json!({"host": "SW-01"}); @@ -551,7 +589,7 @@ mod tests { #[test] fn reset_clears_all_entries() { let config = make_config(Some("{{ host }}"), 2, 60); - let throttler = Throttler::new(Some(&config), "test_rule"); + let throttler = Throttler::new(Some(&config), "test_rule", "vlprod"); let fields = json!({"host": "SW-01"}); @@ -570,7 +608,7 @@ mod tests { #[test] fn debug_format_shows_useful_info() { let config = make_config(Some("{{ host }}"), 3, 60); - let throttler = Throttler::new(Some(&config), "test_rule"); + let throttler = Throttler::new(Some(&config), "test_rule", "vlprod"); let debug = format!("{:?}", throttler); @@ -590,7 +628,7 @@ mod tests { #[test] fn render_key_includes_rule_name() { let config = make_config(Some("{{ rule_name }}-{{ host }}"), 3, 60); - let throttler = Throttler::new(Some(&config), "VM_OFF"); + let throttler = Throttler::new(Some(&config), "VM_OFF", "vlprod"); let fields = json!({"host": "SW-01"}); let key = throttler.render_key(&fields); @@ -604,7 +642,7 @@ mod tests { // tened the same way template rendering does, so `{{ nginx.http.status }}` // works here too (not just in `title`/`body`). let config = make_config(Some("{{ rule_name }}-{{ nginx.http.status_code }}"), 3, 60); - let throttler = Throttler::new(Some(&config), "VM_OFF"); + let throttler = Throttler::new(Some(&config), "VM_OFF", "vlprod"); let fields = json!({"nginx.http.status_code": "404"}); let key = throttler.render_key(&fields); @@ -617,7 +655,7 @@ mod tests { // Collision policy: synthetic rule_name wins over any event field // literally named "rule_name". let config = make_config(Some("{{ rule_name }}"), 3, 60); - let throttler = Throttler::new(Some(&config), "VM_OFF"); + let throttler = Throttler::new(Some(&config), "VM_OFF", "vlprod"); let fields = json!({"rule_name": "event-value", "host": "SW-01"}); let key = throttler.render_key(&fields); @@ -625,11 +663,50 @@ mod tests { assert_eq!(key, "VM_OFF"); } + // =================================================================== + // v2.0.0: vl_source injected into throttle key render context + // =================================================================== + + #[test] + fn render_key_includes_vl_source() { + let config = make_config(Some("{{ rule_name }}-{{ vl_source }}"), 3, 60); + let throttler = Throttler::new(Some(&config), "VM_OFF", "vlprod"); + + let fields = json!({"host": "SW-01"}); + let key = throttler.render_key(&fields); + + assert_eq!(key, "VM_OFF-vlprod"); + } + + #[test] + fn render_key_vl_source_synthetic_overrides_event_field() { + // Collision policy: synthetic vl_source wins over any event field + // literally named "vl_source" (matches rule_name collision policy). + let config = make_config(Some("{{ vl_source }}"), 3, 60); + let throttler = Throttler::new(Some(&config), "VM_OFF", "vlprod"); + + let fields = json!({"vl_source": "evil", "host": "SW-01"}); + let key = throttler.render_key(&fields); + + assert_eq!(key, "vlprod"); + } + + #[test] + fn render_key_custom_template_with_both_synthetics() { + let config = make_config(Some("{{ rule_name }}-{{ vl_source }}-{{ host }}"), 3, 60); + let throttler = Throttler::new(Some(&config), "VM_OFF", "vlprod"); + + let fields = json!({"host": "SW-01"}); + let key = throttler.render_key(&fields); + + assert_eq!(key, "VM_OFF-vlprod-SW-01"); + } + #[test] fn template_error_uses_fallback_key() { // Invalid template syntax that minijinja can't render let config = make_config(Some("{{ nonexistent_filter | bad_filter }}"), 3, 60); - let throttler = Throttler::new(Some(&config), "test_rule"); + let throttler = Throttler::new(Some(&config), "test_rule", "vlprod"); let fields = json!({"host": "SW-01"}); let key = throttler.render_key(&fields); diff --git a/tests/common/mod.rs b/tests/common/mod.rs new file mode 100644 index 0000000..38be906 --- /dev/null +++ b/tests/common/mod.rs @@ -0,0 +1,12 @@ +//! Shared helpers for integration tests. +//! +//! Each file under `tests/` compiles as its own crate, so shared helpers live +//! here and are pulled in via `mod common;` from the consuming test file. +//! +//! The `#[allow(dead_code)]` below is intentional: not every consumer uses +//! every sub-module, and Cargo's per-test-crate compilation would otherwise +//! flag unused items. + +#![allow(dead_code)] + +pub mod vl_events; diff --git a/tests/common/vl_events.rs b/tests/common/vl_events.rs new file mode 100644 index 0000000..1186837 --- /dev/null +++ b/tests/common/vl_events.rs @@ -0,0 +1,182 @@ +//! Loader for the versioned VictoriaLogs event fixture corpus. +//! +//! Fixtures live under `tests/fixtures/vl_events/*.json`, one anonymised +//! one-line JSON event per file. `index.yaml` in the same directory is the +//! authoritative manifest: it documents each fixture (description, +//! `source_system`, `tags`, `prevents_regression_for`). +//! +//! The manifest is parsed once per test crate via `OnceLock`. +//! +//! # Panics +//! +//! Helpers panic aggressively on misuse (missing fixture file, fixture not +//! listed in the index, malformed manifest). The assumption is that these +//! are programmer errors in tests, not runtime failures to recover from. +//! The consistency test `vl_fixtures_consistency.rs` catches them in CI. +//! +//! # Examples +//! +//! ```ignore +//! mod common; +//! use common::vl_events::{load_fixture, load_fixtures_by_tag}; +//! +//! let event = load_fixture("nginx_http_400.json"); +//! assert_eq!(event["_stream"].as_str().is_some(), true); +//! +//! let dotted = load_fixtures_by_tag("dotted_keys"); +//! assert!(!dotted.is_empty()); +//! ``` + +use serde::Deserialize; +use serde_json::Value; +use std::collections::BTreeMap; +use std::path::{Path, PathBuf}; +use std::sync::OnceLock; + +/// Relative path (from crate root) to the fixture directory. +const FIXTURES_DIR: &str = "tests/fixtures/vl_events"; + +/// Filename of the manifest inside `FIXTURES_DIR`. +const INDEX_FILENAME: &str = "index.yaml"; + +/// One manifest entry as declared in `index.yaml`. +#[derive(Debug, Clone, Deserialize)] +pub struct FixtureEntry { + pub description: String, + pub source_system: String, + #[serde(default)] + pub tags: Vec, + #[serde(default)] + pub prevents_regression_for: Vec, +} + +/// Root of the manifest document. +#[derive(Debug, Clone, Deserialize)] +struct Manifest { + fixtures: BTreeMap, +} + +/// Cache: manifest parsed once per test crate. +static MANIFEST: OnceLock> = OnceLock::new(); + +/// Absolute path to the fixture directory. +fn fixtures_dir() -> PathBuf { + let manifest_dir = env!("CARGO_MANIFEST_DIR"); + Path::new(manifest_dir).join(FIXTURES_DIR) +} + +/// Load and cache the manifest. +fn manifest() -> &'static BTreeMap { + MANIFEST.get_or_init(|| { + let path = fixtures_dir().join(INDEX_FILENAME); + let raw = std::fs::read_to_string(&path).unwrap_or_else(|e| { + panic!("failed to read fixture manifest {}: {}", path.display(), e) + }); + let parsed: Manifest = serde_yaml::from_str(&raw).unwrap_or_else(|e| { + panic!("failed to parse fixture manifest {}: {}", path.display(), e) + }); + parsed.fixtures + }) +} + +/// Return the full manifest (for consistency tests). +pub fn manifest_entries() -> &'static BTreeMap { + manifest() +} + +/// Read and parse a fixture by stem (e.g. `"nginx_http_400"`) or by full +/// filename (e.g. `"nginx_http_400.json"`). The `.json` suffix is optional. +/// +/// # Panics +/// +/// Panics if the fixture is not listed in `index.yaml`, if the file is +/// missing, or if it is not valid JSON. +pub fn load_fixture(name: &str) -> Value { + let filename = if name.ends_with(".json") { + name.to_string() + } else { + format!("{}.json", name) + }; + if !manifest().contains_key(&filename) { + panic!( + "fixture '{}' is not declared in {}/{}. Add it to the manifest \ + or pick an existing name.", + name, FIXTURES_DIR, INDEX_FILENAME + ); + } + load_fixture_from_disk(&filename) +} + +/// Read and parse a fixture file from disk without checking the manifest. +/// +/// Used internally to catch the "listed in index but missing on disk" case +/// with a clearer message than a plain IO error. +fn load_fixture_from_disk(name: &str) -> Value { + let path = fixtures_dir().join(name); + let raw = std::fs::read_to_string(&path).unwrap_or_else(|e| { + panic!( + "fixture '{}' declared in index but missing on disk ({}): {}", + name, + path.display(), + e + ) + }); + serde_json::from_str(&raw) + .unwrap_or_else(|e| panic!("fixture '{}' is not valid JSON: {}", name, e)) +} + +/// Return every fixture listed in the manifest as `(name, value)`. +/// +/// Ordering is stable (manifest keys are a `BTreeMap`). +pub fn all_fixtures() -> Vec<(String, Value)> { + manifest() + .keys() + .map(|name| (name.clone(), load_fixture_from_disk(name))) + .collect() +} + +/// Return all fixtures whose manifest entry has `tag` in its `tags` list. +/// +/// Returns an empty `Vec` for an unknown tag; callers that expect a +/// non-empty result should assert it themselves. +pub fn load_fixtures_by_tag(tag: &str) -> Vec<(String, Value)> { + manifest() + .iter() + .filter(|(_, entry)| entry.tags.iter().any(|t| t == tag)) + .map(|(name, _)| (name.clone(), load_fixture_from_disk(name))) + .collect() +} + +/// Return all fixtures whose manifest entry has `source_system == system`. +/// +/// Empty `Vec` for unknown systems; callers assert the expected size. +pub fn load_fixtures_by_source_system(system: &str) -> Vec<(String, Value)> { + manifest() + .iter() + .filter(|(_, entry)| entry.source_system == system) + .map(|(name, _)| (name.clone(), load_fixture_from_disk(name))) + .collect() +} + +/// Return the set of filenames physically present in `FIXTURES_DIR`, +/// excluding the manifest itself. Used by the consistency test. +pub fn filesystem_fixtures() -> Vec { + let dir = fixtures_dir(); + let entries = std::fs::read_dir(&dir) + .unwrap_or_else(|e| panic!("failed to read fixture directory {}: {}", dir.display(), e)); + let mut names: Vec = entries + .filter_map(|e| e.ok()) + .filter_map(|entry| { + let file_name = entry.file_name().to_string_lossy().to_string(); + if file_name == INDEX_FILENAME { + None + } else if entry.path().extension().and_then(|s| s.to_str()) == Some("json") { + Some(file_name) + } else { + None + } + }) + .collect(); + names.sort(); + names +} diff --git a/tests/fixtures/config_disabled_invalid.yaml b/tests/fixtures/config_disabled_invalid.yaml index b0f146b..7f23240 100644 --- a/tests/fixtures/config_disabled_invalid.yaml +++ b/tests/fixtures/config_disabled_invalid.yaml @@ -1,7 +1,8 @@ # Configuration fixture with disabled rule containing invalid regex # Validation should STILL fail even for disabled rules (AD-11) victorialogs: - url: "http://victorialogs:9428" + default: + url: "http://victorialogs:9428" notifiers: test: diff --git a/tests/fixtures/config_email_missing_email_body_html.yaml b/tests/fixtures/config_email_missing_email_body_html.yaml index 3a73135..2accd97 100644 --- a/tests/fixtures/config_email_missing_email_body_html.yaml +++ b/tests/fixtures/config_email_missing_email_body_html.yaml @@ -2,7 +2,8 @@ # This should fail validation at startup (AC7) victorialogs: - url: "http://victorialogs:9428" + default: + url: "http://victorialogs:9428" defaults: throttle: diff --git a/tests/fixtures/config_invalid_basic_auth.yaml b/tests/fixtures/config_invalid_basic_auth.yaml index 29c74fe..79de6a1 100644 --- a/tests/fixtures/config_invalid_basic_auth.yaml +++ b/tests/fixtures/config_invalid_basic_auth.yaml @@ -1,9 +1,10 @@ # Configuration fixture with incomplete Basic Auth (missing password) victorialogs: - url: "https://victorialogs.secure.local:9428" - basic_auth: - username: "testuser" - # password is missing - this should fail to parse + default: + url: "https://victorialogs.secure.local:9428" + basic_auth: + username: "testuser" + # password is missing - this should fail to parse defaults: throttle: diff --git a/tests/fixtures/config_invalid_notifier_type.yaml b/tests/fixtures/config_invalid_notifier_type.yaml index 67ebce2..08f6946 100644 --- a/tests/fixtures/config_invalid_notifier_type.yaml +++ b/tests/fixtures/config_invalid_notifier_type.yaml @@ -1,6 +1,7 @@ # Configuration fixture with invalid notifier type (Story 6.2 - AC4) victorialogs: - url: "http://victorialogs:9428" + default: + url: "http://victorialogs:9428" defaults: throttle: diff --git a/tests/fixtures/config_invalid_regex.yaml b/tests/fixtures/config_invalid_regex.yaml index 161d11b..22c4b24 100644 --- a/tests/fixtures/config_invalid_regex.yaml +++ b/tests/fixtures/config_invalid_regex.yaml @@ -1,6 +1,7 @@ # Configuration fixture with invalid regex pattern for testing fail-fast validation victorialogs: - url: "http://victorialogs:9428" + default: + url: "http://victorialogs:9428" notifiers: test: diff --git a/tests/fixtures/config_invalid_template.yaml b/tests/fixtures/config_invalid_template.yaml index 9728f1c..f511146 100644 --- a/tests/fixtures/config_invalid_template.yaml +++ b/tests/fixtures/config_invalid_template.yaml @@ -1,6 +1,7 @@ # Configuration fixture with invalid Jinja template for testing fail-fast validation victorialogs: - url: "http://victorialogs:9428" + default: + url: "http://victorialogs:9428" notifiers: test: diff --git a/tests/fixtures/config_minimal.yaml b/tests/fixtures/config_minimal.yaml index b0e714f..2be185e 100644 --- a/tests/fixtures/config_minimal.yaml +++ b/tests/fixtures/config_minimal.yaml @@ -1,6 +1,7 @@ # Minimal valid config - matches README example victorialogs: - url: "http://localhost:9428" + default: + url: "http://localhost:9428" notifiers: mattermost-ops: diff --git a/tests/fixtures/config_no_notifier.yaml b/tests/fixtures/config_no_notifier.yaml index 40cb944..bcc1b72 100644 --- a/tests/fixtures/config_no_notifier.yaml +++ b/tests/fixtures/config_no_notifier.yaml @@ -1,7 +1,8 @@ # Invalid config - no notifiers configured # This should fail validation with "no notifiers configured" error victorialogs: - url: "http://localhost:9428" + default: + url: "http://localhost:9428" defaults: throttle: diff --git a/tests/fixtures/config_no_template.yaml b/tests/fixtures/config_no_template.yaml index 7c1a841..da952dc 100644 --- a/tests/fixtures/config_no_template.yaml +++ b/tests/fixtures/config_no_template.yaml @@ -1,7 +1,8 @@ # Invalid config - no templates defined # This should fail validation with "no templates defined" error victorialogs: - url: "http://localhost:9428" + default: + url: "http://localhost:9428" notifiers: test: diff --git a/tests/fixtures/config_valid.yaml b/tests/fixtures/config_valid.yaml index f9e50da..c24c573 100644 --- a/tests/fixtures/config_valid.yaml +++ b/tests/fixtures/config_valid.yaml @@ -1,6 +1,7 @@ # Valid configuration fixture for tests victorialogs: - url: "http://victorialogs:9428" + default: + url: "http://victorialogs:9428" notifiers: default-mattermost: diff --git a/tests/fixtures/config_with_auth.yaml b/tests/fixtures/config_with_auth.yaml index 0176fac..b124158 100644 --- a/tests/fixtures/config_with_auth.yaml +++ b/tests/fixtures/config_with_auth.yaml @@ -2,15 +2,16 @@ # NOTE: This is a TEST FIXTURE only - credentials are intentionally hardcoded. # In production, always use environment variables: ${VL_USER}, ${VL_PASS}, etc. victorialogs: - url: "https://victorialogs.secure.local:9428" - basic_auth: - username: "testuser" - password: "testpassword" - headers: - X-API-Key: "secret-api-key-12345" - X-Custom-Header: "custom-value" - tls: - verify: false + default: + url: "https://victorialogs.secure.local:9428" + basic_auth: + username: "testuser" + password: "testpassword" + headers: + X-API-Key: "secret-api-key-12345" + X-Custom-Header: "custom-value" + tls: + verify: false notifiers: test-notifier: diff --git a/tests/fixtures/config_with_notifiers.yaml b/tests/fixtures/config_with_notifiers.yaml index e12825f..c248d15 100644 --- a/tests/fixtures/config_with_notifiers.yaml +++ b/tests/fixtures/config_with_notifiers.yaml @@ -1,6 +1,7 @@ # Configuration fixture with notifiers section (Story 6.2) victorialogs: - url: "http://victorialogs:9428" + default: + url: "http://victorialogs:9428" defaults: throttle: diff --git a/tests/fixtures/multi-file-collision/config.yaml b/tests/fixtures/multi-file-collision/config.yaml index 88f7a65..cc73fe4 100644 --- a/tests/fixtures/multi-file-collision/config.yaml +++ b/tests/fixtures/multi-file-collision/config.yaml @@ -1,7 +1,8 @@ # Config with a rule that will collide with one in rules.d/ victorialogs: - url: http://victorialogs:9428 + default: + url: http://victorialogs:9428 notifiers: test: diff --git a/tests/fixtures/multi-file-cross-ref/config.yaml b/tests/fixtures/multi-file-cross-ref/config.yaml index ce78aa1..cca4373 100644 --- a/tests/fixtures/multi-file-cross-ref/config.yaml +++ b/tests/fixtures/multi-file-cross-ref/config.yaml @@ -1,7 +1,8 @@ # Config for testing cross-file references victorialogs: - url: http://victorialogs:9428 + default: + url: http://victorialogs:9428 notifiers: test: diff --git a/tests/fixtures/multi-file-empty/config.yaml b/tests/fixtures/multi-file-empty/config.yaml index ebd599a..952fb46 100644 --- a/tests/fixtures/multi-file-empty/config.yaml +++ b/tests/fixtures/multi-file-empty/config.yaml @@ -1,7 +1,8 @@ # Config with empty .d/ directories (should work fine) victorialogs: - url: http://victorialogs:9428 + default: + url: http://victorialogs:9428 notifiers: test: diff --git a/tests/fixtures/multi-file-intra-collision/config.yaml b/tests/fixtures/multi-file-intra-collision/config.yaml index ecdde8a..7ad2bd6 100644 --- a/tests/fixtures/multi-file-intra-collision/config.yaml +++ b/tests/fixtures/multi-file-intra-collision/config.yaml @@ -1,7 +1,8 @@ # Config for testing collision within .d/ directory victorialogs: - url: http://victorialogs:9428 + default: + url: http://victorialogs:9428 notifiers: test: diff --git a/tests/fixtures/multi-file-invalid/config.yaml b/tests/fixtures/multi-file-invalid/config.yaml index efe4f00..4df2cd1 100644 --- a/tests/fixtures/multi-file-invalid/config.yaml +++ b/tests/fixtures/multi-file-invalid/config.yaml @@ -1,7 +1,8 @@ # Valid config, but rules.d/ contains invalid YAML victorialogs: - url: http://victorialogs:9428 + default: + url: http://victorialogs:9428 defaults: throttle: diff --git a/tests/fixtures/multi-file-notifier-collision/config.yaml b/tests/fixtures/multi-file-notifier-collision/config.yaml index c75f71b..64a7f98 100644 --- a/tests/fixtures/multi-file-notifier-collision/config.yaml +++ b/tests/fixtures/multi-file-notifier-collision/config.yaml @@ -1,5 +1,6 @@ victorialogs: - url: http://victorialogs:9428 + default: + url: http://victorialogs:9428 defaults: throttle: diff --git a/tests/fixtures/multi-file-only-d/config.yaml b/tests/fixtures/multi-file-only-d/config.yaml index 4304f87..5182365 100644 --- a/tests/fixtures/multi-file-only-d/config.yaml +++ b/tests/fixtures/multi-file-only-d/config.yaml @@ -2,7 +2,8 @@ # Everything loaded from .d/ directories victorialogs: - url: http://victorialogs:9428 + default: + url: http://victorialogs:9428 defaults: throttle: diff --git a/tests/fixtures/multi-file-template-collision/config.yaml b/tests/fixtures/multi-file-template-collision/config.yaml index 97cb76c..971177d 100644 --- a/tests/fixtures/multi-file-template-collision/config.yaml +++ b/tests/fixtures/multi-file-template-collision/config.yaml @@ -1,5 +1,6 @@ victorialogs: - url: http://victorialogs:9428 + default: + url: http://victorialogs:9428 notifiers: test: diff --git a/tests/fixtures/multi-file/config.yaml b/tests/fixtures/multi-file/config.yaml index 277187a..34c64c4 100644 --- a/tests/fixtures/multi-file/config.yaml +++ b/tests/fixtures/multi-file/config.yaml @@ -1,7 +1,8 @@ # Base config with inline rules/templates/notifiers for multi-file merge testing victorialogs: - url: http://victorialogs:9428 + default: + url: http://victorialogs:9428 defaults: throttle: diff --git a/tests/fixtures/vl_events/audit_event.json b/tests/fixtures/vl_events/audit_event.json new file mode 100644 index 0000000..f472c70 --- /dev/null +++ b/tests/fixtures/vl_events/audit_event.json @@ -0,0 +1 @@ +{"_time":"2026-04-15T11:10:08.770Z","_stream":"{group=\"audit\",host=\"app-host-07\"}","_stream_id":"00000000000000005566778899aabbcc","_msg":"USER_LOGIN pid=4422 uid=0 auid=1001 ses=3 msg='op=login id=1001 exe=\"/usr/sbin/sshd\" hostname=? addr=198.51.100.77 terminal=ssh res=success'","type":"USER_LOGIN","auid":"1001","uid":"0","res":"success","host":"app-host-07"} diff --git a/tests/fixtures/vl_events/cisco_sw_critical.json b/tests/fixtures/vl_events/cisco_sw_critical.json new file mode 100644 index 0000000..e6f1ccf --- /dev/null +++ b/tests/fixtures/vl_events/cisco_sw_critical.json @@ -0,0 +1 @@ +{"_time":"2026-04-15T10:20:17.000Z","_stream":"{group=\"network\",source=\"syslog\"}","_stream_id":"0000000000000000b2c3d4e5f6071122","_msg":"<187>Apr 15 10:20:17 switch-sw-01 %LINK-3-UPDOWN: Interface GigabitEthernet0/24, changed state to down","host":"switch-sw-01","facility":"local7","severity":"critical","appname":"cisco-ios"} diff --git a/tests/fixtures/vl_events/docker_container_exit.json b/tests/fixtures/vl_events/docker_container_exit.json new file mode 100644 index 0000000..0f24592 --- /dev/null +++ b/tests/fixtures/vl_events/docker_container_exit.json @@ -0,0 +1 @@ +{"_time":"2026-04-15T11:15:44.229Z","_stream":"{group=\"docker\",host=\"app-host-08\"}","_stream_id":"000000000000000066778899aabbccdd","_msg":"container sidecar-logger exited with code 1","container_id":"9f8e7d6c5b4a3e2f1d0c9b8a7654321fedcba","container_name":"sidecar-logger","image":"ghcr.io/example/sidecar-logger:1.2.3","exit_code":"1","host":"app-host-08"} diff --git a/tests/fixtures/vl_events/edge_dotted_keys_literal.json b/tests/fixtures/vl_events/edge_dotted_keys_literal.json new file mode 100644 index 0000000..8a5bc76 --- /dev/null +++ b/tests/fixtures/vl_events/edge_dotted_keys_literal.json @@ -0,0 +1 @@ +{"_time":"2026-04-15T11:05:12.333Z","_stream":"{group=\"app\",host=\"app-host-06\"}","_stream_id":"0000000000000000445566778899aabb","_msg":"GET /healthz -> 200","nginx.http.method":"GET","nginx.http.path":"/healthz","nginx.http.status":"200","nginx.http.request_id":"req-0f1e2d3c4b5a","nginx.http.remote_addr":"203.0.113.9","host":"app-host-06"} diff --git a/tests/fixtures/vl_events/edge_empty_fields.json b/tests/fixtures/vl_events/edge_empty_fields.json new file mode 100644 index 0000000..84ee27e --- /dev/null +++ b/tests/fixtures/vl_events/edge_empty_fields.json @@ -0,0 +1 @@ +{"_time":"2026-04-15T11:00:00.000Z","_stream":"{group=\"app\",host=\"app-host-05\"}","_stream_id":"00000000000000003344556677889900","_msg":"request completed","host":"app-host-05","request_id":"","user_id":"","session":"","error":""} diff --git a/tests/fixtures/vl_events/edge_unicode_msg.json b/tests/fixtures/vl_events/edge_unicode_msg.json new file mode 100644 index 0000000..34d1e0d --- /dev/null +++ b/tests/fixtures/vl_events/edge_unicode_msg.json @@ -0,0 +1 @@ +{"_time":"2026-04-15T10:55:41.671Z","_stream":"{group=\"app\",host=\"app-host-04\"}","_stream_id":"000000000000000022334455667788aa","_msg":"\u2705 \u652f\u4ed8\u5931\u8d25 payment failed for user \u5c71\u7530\u592a\u90ce","host":"app-host-04","app":"checkout","locale":"ja_JP.UTF-8"} diff --git a/tests/fixtures/vl_events/index.yaml b/tests/fixtures/vl_events/index.yaml new file mode 100644 index 0000000..c9c7a2d --- /dev/null +++ b/tests/fixtures/vl_events/index.yaml @@ -0,0 +1,107 @@ +# Manifest for VictoriaLogs event fixtures. +# +# Each entry documents one file under this directory. The entry key MUST match +# the filename on disk exactly; the consistency test (`tests/vl_fixtures_consistency.rs`) +# enforces the invariant that index.yaml and the filesystem are in sync. +# +# Schema per entry: +# description: free-form sentence describing what this event represents. +# source_system: one of nginx | cisco | k8s | windows | journald | otel | raw | docker | audit | syslog | edge_case +# tags: list of free-form strings. Consistent vocabulary: +# dotted_keys, flat_keys, regex_extract, pascal_case, multiline_msg, +# unicode, http, syslog, empty_fields, embedded_json, minimal, labels. +# prevents_regression_for: list of issue references (e.g. "#25"). Empty list is valid. +# +# All data is anonymised. Hostnames are fake (app-host-NN, switch-sw-01, k8s-node-01). +# IPs use RFC 5737 ranges (192.0.2.0/24, 198.51.100.0/24, 203.0.113.0/24). +# No real tokens, trace ids, or tenants. +fixtures: + nginx_http_400.json: + description: "nginx reverse proxy 400 response with vector-forwarded flat-dotted keys" + source_system: nginx + tags: [dotted_keys, flat_keys, http] + prevents_regression_for: ["#25"] + + nginx_http_500.json: + description: "nginx 500 upstream failure, same flat-dotted shape as 400 case" + source_system: nginx + tags: [dotted_keys, flat_keys, http] + prevents_regression_for: ["#25"] + + cisco_sw_critical.json: + description: "Cisco IOS syslog link-down event with regex-extractable switch name in _msg" + source_system: cisco + tags: [regex_extract, syslog] + prevents_regression_for: [] + + k8s_pod_oom.json: + description: "Kubernetes container OOMKilled event with kubernetes.* labels as flat-dotted keys" + source_system: k8s + tags: [dotted_keys, flat_keys, labels] + prevents_regression_for: [] + + windows_security_logoff.json: + description: "Windows Security Event 4634 (account logoff) with PascalCase keys and escaped multiline _msg" + source_system: windows + tags: [pascal_case, multiline_msg] + prevents_regression_for: [] + + journald_cron.json: + description: "journald CRON execution with _SYSTEMD_* / _TRANSPORT fields" + source_system: journald + tags: [flat_keys, syslog] + prevents_regression_for: [] + + otel_span.json: + description: "OpenTelemetry server span with trace_id, span_id and dotted service.* / http.* attributes" + source_system: otel + tags: [dotted_keys, flat_keys] + prevents_regression_for: [] + + raw_message_only.json: + description: "Minimal event: only _time, _stream, _stream_id and _msg, no structured fields" + source_system: raw + tags: [minimal] + prevents_regression_for: [] + + json_embedded_msg.json: + description: "_msg contains an embedded JSON payload, verbatim (not pre-parsed by the ingester)" + source_system: raw + tags: [embedded_json] + prevents_regression_for: [] + + edge_unicode_msg.json: + description: "_msg containing CJK characters and an emoji to exercise UTF-8 handling" + source_system: edge_case + tags: [unicode] + prevents_regression_for: [] + + edge_empty_fields.json: + description: "Several structured fields present but empty string, to exercise empty-guard paths" + source_system: edge_case + tags: [empty_fields] + prevents_regression_for: ["#26"] + + edge_dotted_keys_literal.json: + description: "Exact shape from #25 with literal dotted keys nginx.http.* preserved top-level" + source_system: edge_case + tags: [dotted_keys, flat_keys, http] + prevents_regression_for: ["#25"] + + audit_event.json: + description: "Linux auditd USER_LOGIN record with embedded key=value payload inside _msg" + source_system: audit + tags: [regex_extract, flat_keys] + prevents_regression_for: [] + + docker_container_exit.json: + description: "Docker engine event for a sidecar container exiting with non-zero status" + source_system: docker + tags: [flat_keys] + prevents_regression_for: [] + + syslog_raw_daemon.json: + description: "Raw RFC3164 syslog line from chronyd forwarded via a syslog collector" + source_system: syslog + tags: [syslog, regex_extract] + prevents_regression_for: [] diff --git a/tests/fixtures/vl_events/journald_cron.json b/tests/fixtures/vl_events/journald_cron.json new file mode 100644 index 0000000..c1380eb --- /dev/null +++ b/tests/fixtures/vl_events/journald_cron.json @@ -0,0 +1 @@ +{"_time":"2026-04-15T10:35:00.112Z","_stream":"{group=\"systemd\",host=\"app-host-01\"}","_stream_id":"0000000000000000e5f6071122334455","_msg":"(root) CMD (test -x /usr/sbin/anacron || ( cd / && run-parts --report /etc/cron.hourly ))","_SYSTEMD_UNIT":"cron.service","_SYSTEMD_SLICE":"system.slice","_TRANSPORT":"journald","_HOSTNAME":"app-host-01","_PID":"2481","_UID":"0","SYSLOG_IDENTIFIER":"CRON","PRIORITY":"6"} diff --git a/tests/fixtures/vl_events/json_embedded_msg.json b/tests/fixtures/vl_events/json_embedded_msg.json new file mode 100644 index 0000000..43ec67b --- /dev/null +++ b/tests/fixtures/vl_events/json_embedded_msg.json @@ -0,0 +1 @@ +{"_time":"2026-04-15T10:50:55.818Z","_stream":"{group=\"app\",host=\"app-host-03\"}","_stream_id":"0000000000000000112233445566778a","_msg":"{\"level\":\"error\",\"event\":\"payment_declined\",\"user_id\":\"u-00012\",\"amount\":42.50,\"reason\":\"insufficient_funds\"}","host":"app-host-03","app":"billing"} diff --git a/tests/fixtures/vl_events/k8s_pod_oom.json b/tests/fixtures/vl_events/k8s_pod_oom.json new file mode 100644 index 0000000..023c1b9 --- /dev/null +++ b/tests/fixtures/vl_events/k8s_pod_oom.json @@ -0,0 +1 @@ +{"_time":"2026-04-15T10:25:44.877Z","_stream":"{group=\"k8s\",namespace=\"payments\",pod=\"worker-7c9b\"}","_stream_id":"0000000000000000c3d4e5f607112233","_msg":"Container worker terminated (OOMKilled), exit code 137","kubernetes.namespace_name":"payments","kubernetes.pod_name":"worker-7c9b4d5f-abc12","kubernetes.container_name":"worker","kubernetes.node_name":"k8s-node-01","kubernetes.labels.app":"worker","kubernetes.labels.version":"1.8.3","reason":"OOMKilled","exit_code":"137"} diff --git a/tests/fixtures/vl_events/nginx_http_400.json b/tests/fixtures/vl_events/nginx_http_400.json new file mode 100644 index 0000000..809c763 --- /dev/null +++ b/tests/fixtures/vl_events/nginx_http_400.json @@ -0,0 +1 @@ +{"_time":"2026-04-15T10:12:33.421Z","_stream":"{group=\"web\",host=\"app-host-01\"}","_stream_id":"0000000000000000a1b2c3d4e5f60011","_msg":"400 Bad Request from 192.0.2.17","nginx.http.method":"POST","nginx.http.status":"400","nginx.http.request_id":"req-7f9c2a1b8d4e","nginx.http.path":"/api/v1/events","nginx.http.remote_addr":"192.0.2.17","nginx.http.user_agent":"curl/8.6.0","host":"app-host-01"} diff --git a/tests/fixtures/vl_events/nginx_http_500.json b/tests/fixtures/vl_events/nginx_http_500.json new file mode 100644 index 0000000..b7c8de2 --- /dev/null +++ b/tests/fixtures/vl_events/nginx_http_500.json @@ -0,0 +1 @@ +{"_time":"2026-04-15T10:14:02.108Z","_stream":"{group=\"web\",host=\"app-host-02\"}","_stream_id":"0000000000000000a1b2c3d4e5f60022","_msg":"500 Internal Server Error while proxying to upstream","nginx.http.method":"GET","nginx.http.status":"500","nginx.http.request_id":"req-1122aabbccdd","nginx.http.path":"/api/v1/render","nginx.http.remote_addr":"198.51.100.42","nginx.http.upstream":"backend-api:8080","host":"app-host-02"} diff --git a/tests/fixtures/vl_events/otel_span.json b/tests/fixtures/vl_events/otel_span.json new file mode 100644 index 0000000..63cb75a --- /dev/null +++ b/tests/fixtures/vl_events/otel_span.json @@ -0,0 +1 @@ +{"_time":"2026-04-15T10:40:22.340Z","_stream":"{group=\"otel\",service=\"checkout\"}","_stream_id":"0000000000000000f607112233445566","_msg":"span completed: POST /api/checkout","trace_id":"5e8f3c1d92a94fb8b7d6a1e2c3f40517","span_id":"9e3a7b4d15f82c60","parent_span_id":"7a3f8d2e1c4b5a6f","service.name":"checkout","service.version":"2.4.1","http.method":"POST","http.status_code":"200","span.kind":"server","duration_ns":"48210000"} diff --git a/tests/fixtures/vl_events/raw_message_only.json b/tests/fixtures/vl_events/raw_message_only.json new file mode 100644 index 0000000..be09415 --- /dev/null +++ b/tests/fixtures/vl_events/raw_message_only.json @@ -0,0 +1 @@ +{"_time":"2026-04-15T10:45:11.002Z","_stream":"{group=\"misc\"}","_stream_id":"00000000000000000711223344556677","_msg":"disk space usage at 87% on /var"} diff --git a/tests/fixtures/vl_events/syslog_raw_daemon.json b/tests/fixtures/vl_events/syslog_raw_daemon.json new file mode 100644 index 0000000..1a5970a --- /dev/null +++ b/tests/fixtures/vl_events/syslog_raw_daemon.json @@ -0,0 +1 @@ +{"_time":"2026-04-15T11:20:30.444Z","_stream":"{group=\"syslog\",host=\"app-host-09\"}","_stream_id":"00000000000000007788899aabbccdde","_msg":"<30>Apr 15 11:20:30 app-host-09 chronyd[812]: Selected source 203.0.113.123 (pool.example.test)","facility":"daemon","severity":"info","appname":"chronyd","host":"app-host-09"} diff --git a/tests/fixtures/vl_events/windows_security_logoff.json b/tests/fixtures/vl_events/windows_security_logoff.json new file mode 100644 index 0000000..9350067 --- /dev/null +++ b/tests/fixtures/vl_events/windows_security_logoff.json @@ -0,0 +1 @@ +{"_time":"2026-04-15T10:31:09.550Z","_stream":"{group=\"windows\",host=\"WIN-APP-01\"}","_stream_id":"0000000000000000d4e5f60711223344","_msg":"An account was logged off.\r\n\r\nSubject:\r\n\tSecurity ID:\t\tS-1-5-21-0-0-0-1001\r\n\tAccount Name:\t\ttestuser\r\n\tAccount Domain:\t\tEXAMPLE\r\n\tLogon ID:\t\t0x3E7","EventID":"4634","Channel":"Security","Computer":"WIN-APP-01.example.test","Provider":"Microsoft-Windows-Security-Auditing","LogonType":"3","TargetUserName":"testuser","TargetDomainName":"EXAMPLE"} diff --git a/tests/integration_notify.rs b/tests/integration_notify.rs index d28900d..2aefb26 100644 --- a/tests/integration_notify.rs +++ b/tests/integration_notify.rs @@ -27,6 +27,7 @@ fn make_payload_with_destinations(rule_name: &str, destinations: Vec) -> accent_color: Some("#ff0000".to_string()), }, rule_name: rule_name.to_string(), + vl_source: "vlprod".to_string(), destinations, log_timestamp: "2026-01-15T10:49:35.799Z".to_string(), log_timestamp_formatted: "15/01/2026 10:49:35 UTC".to_string(), @@ -207,7 +208,7 @@ async fn test_mattermost_payload_format() { "fallback": "Alert from format_rule", "title": "Alert from format_rule", "text": "Test body content", - "footer": "valerter | format_rule | 15/01/2026 10:49:35 UTC" + "footer": "valerter | format_rule | vlprod | 15/01/2026 10:49:35 UTC" }] }))) .respond_with(ResponseTemplate::new(200)) diff --git a/tests/integration_streaming.rs b/tests/integration_streaming.rs index 5da5fb3..4ae8d27 100644 --- a/tests/integration_streaming.rs +++ b/tests/integration_streaming.rs @@ -53,7 +53,10 @@ async fn test_streaming_basic_single_line() { let config = create_config(&mock_server, "_stream:test"); let mut client = TailClient::new(config).unwrap(); - let lines = client.connect_and_receive("test_rule").await.unwrap(); + let lines = client + .connect_and_receive("test_rule", "default") + .await + .unwrap(); assert_eq!(lines.len(), 1); assert!(lines[0].contains("test log")); @@ -80,7 +83,10 @@ async fn test_streaming_multiple_lines() { let config = create_config(&mock_server, "_stream:multi"); let mut client = TailClient::new(config).unwrap(); - let lines = client.connect_and_receive("test_rule").await.unwrap(); + let lines = client + .connect_and_receive("test_rule", "default") + .await + .unwrap(); assert_eq!(lines.len(), 3); assert!(lines[0].contains("log 1")); @@ -101,7 +107,10 @@ async fn test_streaming_empty_response() { let config = create_config(&mock_server, "_stream:empty"); let mut client = TailClient::new(config).unwrap(); - let lines = client.connect_and_receive("test_rule").await.unwrap(); + let lines = client + .connect_and_receive("test_rule", "default") + .await + .unwrap(); assert!(lines.is_empty()); } @@ -123,7 +132,7 @@ async fn test_connection_error_http_500() { let config = create_config(&mock_server, "_stream:error"); let mut client = TailClient::new(config).unwrap(); - let result = client.connect_and_receive("test_rule").await; + let result = client.connect_and_receive("test_rule", "default").await; assert!(result.is_err()); match result { @@ -147,7 +156,7 @@ async fn test_connection_error_http_404() { let config = create_config(&mock_server, "_stream:notfound"); let mut client = TailClient::new(config).unwrap(); - let result = client.connect_and_receive("test_rule").await; + let result = client.connect_and_receive("test_rule", "default").await; assert!(result.is_err()); match result { @@ -171,7 +180,7 @@ async fn test_connection_error_http_503() { let config = create_config(&mock_server, "_stream:unavailable"); let mut client = TailClient::new(config).unwrap(); - let result = client.connect_and_receive("test_rule").await; + let result = client.connect_and_receive("test_rule", "default").await; assert!(result.is_err()); match result { @@ -196,7 +205,7 @@ async fn test_connection_error_server_down() { let mut client = TailClient::new(config).unwrap(); - let result = client.connect_and_receive("test_rule").await; + let result = client.connect_and_receive("test_rule", "default").await; assert!(result.is_err()); match result { @@ -231,7 +240,7 @@ async fn test_timeout_detection() { let mut client = TailClient::new(config).unwrap(); // This won't actually timeout since delay is short, but tests the path - let result = client.connect_and_receive("test_rule").await; + let result = client.connect_and_receive("test_rule", "default").await; // Should succeed (data received before timeout) // The buffer will hold "incomplete" without newline @@ -287,7 +296,7 @@ async fn test_url_construction_is_correct() { }; let mut client = TailClient::new(config).unwrap(); - let _ = client.connect_and_receive("test_rule").await; + let _ = client.connect_and_receive("test_rule", "default").await; // If we get here without panic, the URL matched } @@ -315,7 +324,7 @@ async fn test_url_with_start_param() { }; let mut client = TailClient::new(config).unwrap(); - let _ = client.connect_and_receive("test_rule").await; + let _ = client.connect_and_receive("test_rule", "default").await; } // ============================================================================= @@ -338,7 +347,7 @@ async fn test_headers_are_set_correctly() { let config = create_config(&mock_server, "_stream:headers"); let mut client = TailClient::new(config).unwrap(); - let _ = client.connect_and_receive("test_rule").await; + let _ = client.connect_and_receive("test_rule", "default").await; } // ============================================================================= @@ -365,7 +374,10 @@ async fn test_streaming_with_utf8_content() { let config = create_config(&mock_server, "_stream:utf8"); let mut client = TailClient::new(config).unwrap(); - let lines = client.connect_and_receive("test_rule").await.unwrap(); + let lines = client + .connect_and_receive("test_rule", "default") + .await + .unwrap(); assert_eq!(lines.len(), 2); assert!(lines[0].contains("Café")); @@ -395,7 +407,7 @@ impl TestReconnectCallback { } impl ReconnectCallback for TestReconnectCallback { - fn on_reconnect(&self, _rule_name: &str) { + fn on_reconnect(&self, _rule_name: &str, _vl_source: &str) { self.count.fetch_add(1, Ordering::SeqCst); } } @@ -423,7 +435,7 @@ async fn test_stream_with_reconnect_receives_lines() { // Use tokio::time::timeout to prevent infinite loop let result = tokio::time::timeout(Duration::from_millis(500), async { client - .stream_with_reconnect("test_rule", None, |line| { + .stream_with_reconnect("test_rule", "default", None, |line| { let lines = Arc::clone(&lines_clone); async move { lines.lock().unwrap().push(line); @@ -474,7 +486,7 @@ async fn test_stream_with_reconnect_retries_on_error() { // Use short timeout - should get at least one retry and one success let _ = tokio::time::timeout(Duration::from_secs(3), async { client - .stream_with_reconnect("test_rule", Some(&callback), |line| { + .stream_with_reconnect("test_rule", "default", Some(&callback), |line| { let lines = Arc::clone(&lines_clone); async move { lines.lock().unwrap().push(line); @@ -505,15 +517,15 @@ async fn test_stream_with_reconnect_retries_on_error() { #[test] fn test_log_reconnection_attempt_does_not_panic() { // Just verify the function can be called without panic - log_reconnection_attempt("test_rule", 0, Duration::from_secs(1)); - log_reconnection_attempt("test_rule", 5, Duration::from_secs(32)); - log_reconnection_attempt("test_rule", 10, Duration::from_secs(60)); + log_reconnection_attempt("test_rule", "default", 0, Duration::from_secs(1)); + log_reconnection_attempt("test_rule", "default", 5, Duration::from_secs(32)); + log_reconnection_attempt("test_rule", "default", 10, Duration::from_secs(60)); } #[test] fn test_log_reconnection_success_does_not_panic() { // Just verify the function can be called without panic - log_reconnection_success("test_rule"); + log_reconnection_success("test_rule", "default"); } #[test] @@ -521,10 +533,10 @@ fn test_reconnect_callback_trait() { let callback = TestReconnectCallback::new(); assert_eq!(callback.reconnect_count(), 0); - callback.on_reconnect("rule1"); + callback.on_reconnect("rule1", "vlprod"); assert_eq!(callback.reconnect_count(), 1); - callback.on_reconnect("rule2"); + callback.on_reconnect("rule2", "vldev"); assert_eq!(callback.reconnect_count(), 2); } @@ -598,7 +610,10 @@ async fn test_basic_auth_header_is_sent() { let config = create_config_with_basic_auth(&mock_server, "testuser", "testpass"); let mut client = TailClient::new(config).unwrap(); - let lines = client.connect_and_receive("test_rule").await.unwrap(); + let lines = client + .connect_and_receive("test_rule", "default") + .await + .unwrap(); assert_eq!(lines.len(), 1); assert!(lines[0].contains("authenticated")); @@ -633,7 +648,10 @@ async fn test_custom_headers_are_sent() { let config = create_config_with_headers(&mock_server, headers); let mut client = TailClient::new(config).unwrap(); - let lines = client.connect_and_receive("test_rule").await.unwrap(); + let lines = client + .connect_and_receive("test_rule", "default") + .await + .unwrap(); assert_eq!(lines.len(), 1); assert!(lines[0].contains("headers received")); @@ -663,7 +681,10 @@ async fn test_bearer_token_in_header() { let config = create_config_with_headers(&mock_server, headers); let mut client = TailClient::new(config).unwrap(); - let lines = client.connect_and_receive("test_rule").await.unwrap(); + let lines = client + .connect_and_receive("test_rule", "default") + .await + .unwrap(); assert_eq!(lines.len(), 1); assert!(lines[0].contains("bearer auth ok")); @@ -706,7 +727,10 @@ async fn test_basic_auth_with_custom_headers_combined() { let mut client = TailClient::new(config).unwrap(); - let lines = client.connect_and_receive("test_rule").await.unwrap(); + let lines = client + .connect_and_receive("test_rule", "default") + .await + .unwrap(); assert_eq!(lines.len(), 1); assert!(lines[0].contains("combined auth ok")); @@ -734,7 +758,7 @@ async fn test_basic_auth_401_on_wrong_credentials() { let config = create_config_with_basic_auth(&mock_server, "wrong_user", "wrong_pass"); let mut client = TailClient::new(config).unwrap(); - let result = client.connect_and_receive("test_rule").await; + let result = client.connect_and_receive("test_rule", "default").await; assert!(result.is_err()); match result { @@ -763,7 +787,10 @@ async fn test_without_auth_no_authorization_header() { let config = create_config(&mock_server, "_stream:noauth"); let mut client = TailClient::new(config).unwrap(); - let lines = client.connect_and_receive("test_rule").await.unwrap(); + let lines = client + .connect_and_receive("test_rule", "default") + .await + .unwrap(); assert_eq!(lines.len(), 1); assert!(lines[0].contains("no auth")); diff --git a/tests/integration_validate.rs b/tests/integration_validate.rs index 70dab3f..2f994b2 100644 --- a/tests/integration_validate.rs +++ b/tests/integration_validate.rs @@ -57,8 +57,8 @@ fn validate_valid_config_exits_success() { stdout ); assert!( - stdout.contains("VictoriaLogs URL"), - "Output should show VictoriaLogs URL: {}", + stdout.contains("VictoriaLogs sources"), + "Output should show VictoriaLogs sources summary: {}", stdout ); assert!( @@ -277,6 +277,87 @@ fn validate_mattermost_env_var_ignored() { ); } +// Test: shipped config/config.example.yaml passes --validate +// Locks the canonical example against future schema drift. +#[test] +fn shipped_config_example_validates() { + ensure_binary_built(); + + let config_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("config") + .join("config.example.yaml"); + + let output = Command::new(valerter_binary()) + .args(["--validate", "-c"]) + .arg(&config_path) + .output() + .expect("Failed to run valerter"); + + assert!( + output.status.success(), + "shipped config '{}' must pass --validate (exit 0)\nstdout: {}\nstderr: {}", + config_path.display(), + String::from_utf8_lossy(&output.stdout), + String::from_utf8_lossy(&output.stderr) + ); +} + +// Test: every shipped examples//config.yaml passes --validate +// Iterates so new example folders are picked up automatically. +// Sets placeholder env vars so examples that reference ${...} secrets validate +// without hitting the network (--validate does not actually call any backend). +#[test] +fn shipped_examples_pass_validate() { + ensure_binary_built(); + + let examples_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("examples"); + + let entries = std::fs::read_dir(&examples_dir).expect("Failed to read examples/ directory"); + + let mut checked = 0usize; + for entry in entries { + let entry = entry.expect("Failed to read examples/ entry"); + let path = entry.path(); + if !path.is_dir() { + continue; + } + let config_path = path.join("config.yaml"); + if !config_path.exists() { + continue; + } + + let output = Command::new(valerter_binary()) + .args(["--validate", "-c"]) + .arg(&config_path) + // Placeholders for examples that reference ${VAR} secrets. + // --validate does not perform any network call, so values can be dummies. + .env("WEBHOOK_URL", "https://mattermost.example.com/hooks/dummy") + .env("VL_PROD_USER", "dummy_user") + .env("VL_PROD_PASS", "dummy_pass") + .env("VL_PROD_TOKEN", "dummy_token") + .env("SMTP_USER", "dummy_smtp_user") + .env("SMTP_PASSWORD", "dummy_smtp_pass") + .env("TELEGRAM_BOT_TOKEN", "dummy_bot_token") + .output() + .expect("Failed to run valerter"); + + assert!( + output.status.success(), + "shipped example '{}' must pass --validate (exit 0)\nstdout: {}\nstderr: {}", + config_path.display(), + String::from_utf8_lossy(&output.stdout), + String::from_utf8_lossy(&output.stderr) + ); + checked += 1; + } + + assert!( + checked >= 1, + "expected at least one examples//config.yaml to validate, found 0 in {}", + examples_dir.display() + ); +} + // Test: --validate with minimal config (README example) passes #[test] fn validate_minimal_config_exits_success() { diff --git a/tests/metrics_snapshot.rs b/tests/metrics_snapshot.rs new file mode 100644 index 0000000..b8b5003 --- /dev/null +++ b/tests/metrics_snapshot.rs @@ -0,0 +1,330 @@ +//! `/metrics` snapshot test for the multi-source observability work +//! (v2.0.0 part 2). +//! +//! Spins up a 2-source 1-rule engine plus the real Prometheus metrics +//! exporter on an ephemeral port, exercises every per-rule metric path +//! once (alert sent, alert throttled, parse error, log matched), then +//! scrapes `/metrics` and asserts the **set of metric names + label keys** +//! against an inline expected list. Values and timestamps are intentionally +//! ignored — the test catches accidental metric rename/relabel in future PRs +//! without coupling to runtime numbers. +//! +//! ## Why a separate integration test binary +//! +//! `metrics-exporter-prometheus` installs a global recorder via +//! `PrometheusBuilder::install()`, which can only run once per process. Each +//! integration test binary gets its own process, so this file owns the +//! recorder for its run and does not race with `src/metrics.rs` unit tests +//! or other integration suites. + +use std::collections::BTreeMap; +use std::collections::BTreeSet; +use std::sync::Arc; +use std::time::Duration; + +use serde_json::Value; +use tokio::sync::broadcast; +use tokio_util::sync::CancellationToken; +use valerter::config::{ + CompiledParser, CompiledRule, CompiledTemplate, DEFAULT_MAX_STREAMS, DefaultsConfig, + JsonParserConfig, MetricsConfig, NotifyConfig, RuntimeConfig, ThrottleConfig, VlSourceConfig, +}; +use valerter::notify::{AlertPayload, NotificationQueue}; +use valerter::{MetricsServer, RuleEngine}; +use wiremock::matchers::{method, path}; +use wiremock::{Mock, MockServer, ResponseTemplate}; + +/// Re-serialize a JSON value into NDJSON (one event + trailing newline). +fn ndjson_body(events: &[&Value]) -> Vec { + let mut out = Vec::new(); + for ev in events { + out.extend_from_slice( + serde_json::to_vec(ev) + .expect("fixture is valid JSON") + .as_slice(), + ); + out.push(b'\n'); + } + out +} + +async fn mount_ndjson(server: &MockServer, events: &[&Value]) { + Mock::given(method("GET")) + .and(path("/select/logsql/tail")) + .respond_with( + ResponseTemplate::new(200).set_body_raw(ndjson_body(events), "application/x-ndjson"), + ) + .mount(server) + .await; +} + +fn rule(name: &str, vl_sources: Vec, throttle_count: u32) -> CompiledRule { + CompiledRule { + name: name.to_string(), + enabled: true, + query: "_stream:test".to_string(), + parser: CompiledParser { + regex: None, + json: Some(JsonParserConfig { + fields: vec!["_msg".to_string()], + }), + }, + throttle: Some(valerter::config::CompiledThrottle { + key_template: None, + count: throttle_count, + window: Duration::from_secs(60), + }), + notify: NotifyConfig { + template: "tpl".to_string(), + mattermost_channel: None, + destinations: vec!["dest".to_string()], + }, + vl_sources, + } +} + +fn vl_source(uri: &str) -> VlSourceConfig { + VlSourceConfig { + url: uri.to_string(), + basic_auth: None, + headers: None, + tls: None, + } +} + +fn runtime(sources: BTreeMap, rules: Vec) -> RuntimeConfig { + let mut templates = std::collections::HashMap::new(); + templates.insert( + "tpl".to_string(), + CompiledTemplate { + title: "{{ rule_name }}@{{ vl_source }}".to_string(), + body: "{{ _msg }}".to_string(), + email_body_html: None, + accent_color: None, + }, + ); + + RuntimeConfig { + victorialogs: sources, + defaults: DefaultsConfig { + throttle: ThrottleConfig { + key: None, + count: 5, + window: Duration::from_secs(60), + }, + timestamp_timezone: "UTC".to_string(), + max_streams: DEFAULT_MAX_STREAMS, + }, + templates, + rules, + metrics: MetricsConfig::default(), + notifiers: None, + config_dir: std::path::PathBuf::from("."), + } +} + +async fn drain(rx: &mut broadcast::Receiver, max: usize, deadline: Duration) { + let mut got = 0; + let _ = tokio::time::timeout(deadline, async { + while got < max { + match rx.recv().await { + Ok(_) => got += 1, + Err(_) => break, + } + } + }) + .await; +} + +/// Parse a Prometheus exposition body and return the set of `name{labelkeys}` +/// strings. Label *values* and metric *values* are stripped; only the metric +/// identifier and the **sorted set of label keys** are kept. This is exactly +/// what we want to catch accidental rename/relabel without coupling to +/// counter values, timestamps, or how many label-value combinations exist. +fn extract_name_label_keys(body: &str) -> BTreeSet { + let mut out = BTreeSet::new(); + for line in body.lines() { + let line = line.trim(); + if line.is_empty() || line.starts_with('#') { + continue; + } + // Extract name and (optional) labels block. Format: `name{k="v",...} value` + // or `name value`. + let (head, _) = match line.split_once(' ') { + Some(parts) => parts, + None => continue, + }; + let (name, label_keys) = if let Some(brace) = head.find('{') { + let name = &head[..brace]; + let labels_str = &head[brace + 1..head.len() - 1]; + let mut keys: Vec<&str> = labels_str + .split(',') + .filter_map(|kv| kv.split_once('=').map(|(k, _)| k)) + .collect(); + keys.sort(); + keys.dedup(); + (name.to_string(), keys.join(",")) + } else { + (head.to_string(), String::new()) + }; + if label_keys.is_empty() { + out.insert(name); + } else { + out.insert(format!("{}{{{}}}", name, label_keys)); + } + } + out +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn metrics_snapshot_two_sources_one_rule() { + // 1) Mock 2 VL sources. Source `vlprod` serves a parseable event so the + // engine drives the throttle + alert path. Source `vldev` serves a + // line that fails JSON parsing, exercising the parse-error path. + let vlprod = MockServer::start().await; + let vldev = MockServer::start().await; + + let good_event: Value = serde_json::json!({ + "_time": "2026-04-15T10:00:00Z", + "_stream": "{}", + "_msg": "ok", + }); + mount_ndjson(&vlprod, &[&good_event, &good_event, &good_event]).await; + + // For vldev, serve raw garbage so the parser increments + // `valerter_parse_errors_total{rule_name, vl_source, error_type}`. + Mock::given(method("GET")) + .and(path("/select/logsql/tail")) + .respond_with( + ResponseTemplate::new(200) + .set_body_raw(b"this is not json\n".to_vec(), "application/x-ndjson"), + ) + .mount(&vldev) + .await; + + let mut sources = BTreeMap::new(); + sources.insert("vlprod".to_string(), vl_source(&vlprod.uri())); + sources.insert("vldev".to_string(), vl_source(&vldev.uri())); + + // Throttle count=1 on the only rule so the second event on `vlprod` + // also exercises the throttled path. + let rules = vec![rule("snapshot_rule", Vec::new(), 1)]; + let cfg = runtime(sources, rules); + + // 2) Boot the metrics server on an ephemeral port. The recorder install + // is a one-shot global; subsequent tests in this same binary cannot + // install it again, which is why this file is a dedicated integration + // test. + let port = portpicker::pick_unused_port().expect("free port"); + let cancel = CancellationToken::new(); + let (ready_tx, ready_rx) = tokio::sync::oneshot::channel(); + let metrics_cancel = cancel.clone(); + let metrics_handle = tokio::spawn(async move { + let server = MetricsServer::with_ready_signal(port, ready_tx); + let _ = server.run(metrics_cancel).await; + }); + ready_rx.await.expect("metrics server should signal ready"); + + // 3) Initialize all known metric series so the snapshot is deterministic + // even before counters tick. Mirrors the call valerter's main does. + let rule_source_pairs: Vec<(&str, &str)> = + vec![("snapshot_rule", "vlprod"), ("snapshot_rule", "vldev")]; + let source_names: Vec<&str> = vec!["vlprod", "vldev"]; + // Pass at least one notifier so the per-notifier sentinel counters + // (`alerts_failed_total{notifier}` / `notify_errors_total{notifier}`) + // are seeded and the snapshot can assert their presence. + let notifier_names: Vec<&str> = vec!["sentinel"]; + valerter::initialize_metrics(&rule_source_pairs, &source_names, ¬ifier_names); + + // 4) Run the engine briefly so each metric path fires at least once. + let queue = NotificationQueue::new(64); + let mut rx = queue.subscribe(); + let engine = RuleEngine::new(cfg, reqwest::Client::new(), queue.clone()); + let cancel_for_engine = cancel.clone(); + let engine_handle = tokio::spawn(async move { engine.run(cancel_for_engine).await }); + + // Drain a few alerts to make sure the throttle/passed/sent paths run. + drain(&mut rx, 5, Duration::from_secs(2)).await; + + // 5) Scrape /metrics. + let url = format!("http://127.0.0.1:{}/metrics", port); + let body = reqwest::Client::new() + .get(&url) + .send() + .await + .expect("scrape should succeed") + .text() + .await + .expect("body should decode"); + + // 6) Tear down. The engine task runs forever until cancelled. + cancel.cancel(); + let _ = tokio::time::timeout(Duration::from_secs(2), engine_handle).await; + let _ = tokio::time::timeout(Duration::from_secs(1), metrics_handle).await; + + // 7) Assert the snapshot. We check that *every expected* metric series + // (name + label-key set) is present. The actual output may carry + // additional series from per-(rule, source) initialization that we + // explicitly seeded, so we tolerate supersets. + let actual = extract_name_label_keys(&body); + + // Inline expected snapshot. Sorted alphabetically for stable diffs. + // Each entry is `metric_name{label_keys_csv_sorted}` or just + // `metric_name` when unlabeled. + let expected: Arc<[&'static str]> = Arc::from([ + // Per-(rule, source) counters seeded by initialize_metrics. + "valerter_alerts_passed_total{rule_name,vl_source}", + "valerter_alerts_sent_total{rule_name,vl_source}", + "valerter_alerts_throttled_total{rule_name,vl_source}", + "valerter_logs_matched_total{rule_name,vl_source}", + "valerter_parse_errors_total{rule_name,vl_source}", + "valerter_reconnections_total{rule_name,vl_source}", + "valerter_rule_errors_total{rule_name,vl_source}", + "valerter_rule_panics_total{rule_name,vl_source}", + // Per-(rule, source) discarded counter (3-label, reason="oversized"). + "valerter_lines_discarded_total{reason,rule_name,vl_source}", + // Per-(rule, source) gauge for last query timestamp. + "valerter_last_query_timestamp{rule_name,vl_source}", + // Per-(rule, source) histogram exported as a Prometheus summary by + // metrics-exporter-prometheus: emits the metric with `quantile` label + // plus `_sum` and `_count` companion series. + "valerter_query_duration_seconds{quantile,rule_name,vl_source}", + "valerter_query_duration_seconds_count{rule_name,vl_source}", + "valerter_query_duration_seconds_sum{rule_name,vl_source}", + // Per-notifier sentinel counters. + "valerter_alerts_failed_total{notifier}", + "valerter_notify_errors_total{notifier}", + // Global / shared counters & gauges. + "valerter_alerts_dropped_total", + "valerter_queue_size", + "valerter_uptime_seconds", + // Per-source reachability gauge (replaces the old per-rule + // valerter_victorialogs_up). + "valerter_vl_source_up{vl_source}", + // Build info carries only the version label. + "valerter_build_info{version}", + ]); + + let mut missing: Vec = Vec::new(); + for want in expected.iter() { + if !actual.contains(*want) { + missing.push((*want).to_string()); + } + } + assert!( + missing.is_empty(), + "metrics snapshot missing expected series:\n missing = {:#?}\n\n actual = {:#?}\n\n raw body =\n{}", + missing, + actual, + body + ); + + // Hard regression: the v1.x per-rule gauge MUST be gone in v2.0.0. + assert!( + !actual + .iter() + .any(|s| s.starts_with("valerter_victorialogs_up")), + "valerter_victorialogs_up must be removed in v2.0.0 (replaced by valerter_vl_source_up). Found in /metrics:\n{}", + body + ); +} diff --git a/tests/multi_source_integration.rs b/tests/multi_source_integration.rs new file mode 100644 index 0000000..e964532 --- /dev/null +++ b/tests/multi_source_integration.rs @@ -0,0 +1,393 @@ +//! End-to-end integration test for the v2.0.0 multi-source VL core. +//! +//! Spins up **two** wiremock `MockServer` instances to stand in for two +//! VictoriaLogs sources (`vlprod`, `vldev`), serves distinct fixture events +//! on each, runs `RuleEngine` against the pair, and inspects the alert +//! payloads arriving on the notification queue to prove: +//! +//! 1. A rule with `vl_sources: [vlprod]` spawns exactly one task and its +//! payloads carry `vl_source == "vlprod"`. +//! 2. A rule with empty `vl_sources` fans out across every source and +//! payloads arrive tagged with each source name. +//! 3. The synthetic `vl_source` field is rendered in template output +//! (layer 1) and survives in `AlertPayload` for layer 2 notifiers. +//! 4. Per-source default throttle buckets are isolated: two sources sending +//! identical events both pass through on first delivery rather than the +//! second being dropped as a duplicate. +//! +//! The fixture corpus from `tests/fixtures/vl_events/` (chore/vl-fixtures-corpus) +//! is consumed via `common::vl_events::load_fixture`. + +mod common; + +use std::collections::BTreeMap; +use std::sync::Arc; +use std::time::Duration; + +use serde_json::Value; +use tokio::sync::broadcast; +use tokio_util::sync::CancellationToken; +use valerter::config::{ + CompiledParser, CompiledRule, CompiledTemplate, DefaultsConfig, JsonParserConfig, + MetricsConfig, NotifyConfig, RuntimeConfig, ThrottleConfig, VlSourceConfig, +}; +use valerter::notify::{AlertPayload, NotificationQueue}; +use valerter::{RuleEngine, TemplateEngine}; +use wiremock::matchers::{method, path}; +use wiremock::{Mock, MockServer, ResponseTemplate}; + +use common::vl_events::load_fixture; + +/// Re-serialize a JSON fixture (which in the corpus is a single object) into +/// an NDJSON response body — one JSON object followed by a trailing newline. +fn ndjson_body(events: &[&Value]) -> Vec { + let mut out = Vec::new(); + for ev in events { + out.extend_from_slice( + serde_json::to_vec(ev) + .expect("fixture is valid JSON") + .as_slice(), + ); + out.push(b'\n'); + } + out +} + +/// Build an NDJSON mock that replies once with the given events. +async fn mount_ndjson(server: &MockServer, events: &[&Value]) { + Mock::given(method("GET")) + .and(path("/select/logsql/tail")) + .respond_with( + ResponseTemplate::new(200).set_body_raw(ndjson_body(events), "application/x-ndjson"), + ) + .mount(server) + .await; +} + +fn rule(name: &str, vl_sources: Vec) -> CompiledRule { + CompiledRule { + name: name.to_string(), + enabled: true, + query: "_stream:test".to_string(), + parser: CompiledParser { + regex: None, + json: Some(JsonParserConfig { + fields: vec!["_msg".to_string()], + }), + }, + throttle: None, + notify: NotifyConfig { + template: "tpl".to_string(), + mattermost_channel: None, + destinations: vec!["dest".to_string()], + }, + vl_sources, + } +} + +fn vl_source(uri: &str) -> VlSourceConfig { + VlSourceConfig { + url: uri.to_string(), + basic_auth: None, + headers: None, + tls: None, + } +} + +fn runtime(sources: BTreeMap, rules: Vec) -> RuntimeConfig { + let mut templates = std::collections::HashMap::new(); + templates.insert( + "tpl".to_string(), + CompiledTemplate { + title: "[{{ vl_source }}] {{ rule_name }}".to_string(), + body: "source={{ vl_source }} msg={{ _msg }}".to_string(), + email_body_html: None, + accent_color: None, + }, + ); + + RuntimeConfig { + victorialogs: sources, + defaults: DefaultsConfig { + throttle: ThrottleConfig { + key: None, + count: 5, + window: Duration::from_secs(60), + }, + timestamp_timezone: "UTC".to_string(), + max_streams: valerter::config::DEFAULT_MAX_STREAMS, + }, + templates, + rules, + metrics: MetricsConfig::default(), + notifiers: None, + config_dir: std::path::PathBuf::from("."), + } +} + +/// Collect up to `max` alerts from a pre-created receiver within `deadline`, +/// returning whatever arrived. Using a receiver created BEFORE the engine +/// spawns avoids the broadcast channel's "messages before subscribe are lost" +/// behaviour (see tokio::sync::broadcast docs). +async fn drain_from( + rx: &mut broadcast::Receiver, + max: usize, + deadline: Duration, +) -> Vec { + let mut out = Vec::new(); + let _ = tokio::time::timeout(deadline, async { + while out.len() < max { + match rx.recv().await { + Ok(p) => out.push(p), + Err(_) => break, + } + } + }) + .await; + out +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn multi_source_rule_with_vl_sources_list_targets_single_source() { + let vlprod = MockServer::start().await; + let vldev = MockServer::start().await; + + let ev_prod = load_fixture("nginx_http_400.json"); + let ev_dev = load_fixture("nginx_http_500.json"); + + mount_ndjson(&vlprod, &[&ev_prod]).await; + mount_ndjson(&vldev, &[&ev_dev]).await; + + let mut sources = BTreeMap::new(); + sources.insert("vlprod".to_string(), vl_source(&vlprod.uri())); + sources.insert("vldev".to_string(), vl_source(&vldev.uri())); + + // Rule pinned to vlprod only. vldev's stream must never produce an alert + // via this rule. + let rules = vec![rule("prod_only", vec!["vlprod".to_string()])]; + + let queue = NotificationQueue::new(64); + // Subscribe BEFORE spawning the engine so no payloads are lost. + let mut rx = queue.subscribe(); + + let cfg = runtime(sources, rules); + let engine = RuleEngine::new(cfg, reqwest::Client::new(), queue.clone()); + + let cancel = CancellationToken::new(); + let cancel_clone = cancel.clone(); + let handle = tokio::spawn(async move { engine.run(cancel_clone).await }); + + let alerts = drain_from(&mut rx, 1, Duration::from_secs(3)).await; + // Negative-evidence proof: vldev MUST NOT have served any tail request, + // since the only rule is pinned to vlprod. Catches regressions where the + // resolve_sources filter is bypassed and the rule fans out anyway. + let vldev_hits = vldev.received_requests().await.unwrap_or_default(); + cancel.cancel(); + let _ = tokio::time::timeout(Duration::from_secs(1), handle).await; + + assert!( + !alerts.is_empty(), + "rule pinned to vlprod should have produced at least one alert" + ); + for a in &alerts { + assert_eq!( + a.vl_source, "vlprod", + "payload carried wrong vl_source: {}", + a.vl_source + ); + assert!( + a.message.title.contains("vlprod"), + "layer 1 title must contain rendered vl_source, got: {}", + a.message.title + ); + } + assert!( + vldev_hits.is_empty(), + "vldev served {} request(s) for a rule pinned to vlprod (negative-evidence assertion)", + vldev_hits.len() + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn multi_source_rule_without_vl_sources_fans_out_across_all() { + let vlprod = MockServer::start().await; + let vldev = MockServer::start().await; + + let ev = load_fixture("k8s_pod_oom.json"); + mount_ndjson(&vlprod, &[&ev]).await; + mount_ndjson(&vldev, &[&ev]).await; + + let mut sources = BTreeMap::new(); + sources.insert("vlprod".to_string(), vl_source(&vlprod.uri())); + sources.insert("vldev".to_string(), vl_source(&vldev.uri())); + + // vl_sources empty = fan out across all sources. + let rules = vec![rule("fan_out", Vec::new())]; + + let queue = NotificationQueue::new(64); + let mut rx = queue.subscribe(); + let cfg = runtime(sources, rules); + let engine = RuleEngine::new(cfg, reqwest::Client::new(), queue.clone()); + + let cancel = CancellationToken::new(); + let cancel_clone = cancel.clone(); + let handle = tokio::spawn(async move { engine.run(cancel_clone).await }); + + // Drain a fixed time window with a high `max` so we don't exit before + // the slower of the two parallel source tasks delivers its first alert. + let alerts = drain_from(&mut rx, 200, Duration::from_secs(2)).await; + cancel.cancel(); + let _ = tokio::time::timeout(Duration::from_secs(1), handle).await; + + let mut sources_seen: std::collections::HashSet = Default::default(); + for a in &alerts { + sources_seen.insert(a.vl_source.clone()); + } + + assert!( + sources_seen.contains("vlprod"), + "expected vlprod alert, got sources: {:?}", + sources_seen + ); + assert!( + sources_seen.contains("vldev"), + "expected vldev alert, got sources: {:?}", + sources_seen + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn multi_source_default_throttle_buckets_are_isolated_per_source() { + // The same event delivered twice on two different sources must not be + // deduped as a single bucket when the rule uses the default throttle + // (no custom key). The default key is `{rule}-{source}:global`, so + // each source has its own bucket and both alerts should land. + let vlprod = MockServer::start().await; + let vldev = MockServer::start().await; + + // Low count (=1) would cause a cross-source collision under v1 default. + let ev = load_fixture("nginx_http_500.json"); + mount_ndjson(&vlprod, &[&ev]).await; + mount_ndjson(&vldev, &[&ev]).await; + + let mut sources = BTreeMap::new(); + sources.insert("vlprod".to_string(), vl_source(&vlprod.uri())); + sources.insert("vldev".to_string(), vl_source(&vldev.uri())); + + // Tight throttle count=1: if buckets were shared the second source + // would be blocked. + let mut cfg = runtime(sources, vec![rule("isolate", Vec::new())]); + cfg.defaults.throttle.count = 1; + + let queue = NotificationQueue::new(64); + let mut rx = queue.subscribe(); + let engine = RuleEngine::new(cfg, reqwest::Client::new(), queue.clone()); + + let cancel = CancellationToken::new(); + let cancel_clone = cancel.clone(); + let handle = tokio::spawn(async move { engine.run(cancel_clone).await }); + + // Drain for a fixed window long enough for both sources' first stream + // to land. With throttle count=1 each (rule, source) bucket allows only + // one alert through, but both buckets are independent so both deliver. + // Use a high `max` so we don't exit early before both sources land. + let alerts = drain_from(&mut rx, 100, Duration::from_secs(2)).await; + cancel.cancel(); + let _ = tokio::time::timeout(Duration::from_secs(1), handle).await; + + let sources_seen: std::collections::HashSet = + alerts.iter().map(|a| a.vl_source.clone()).collect(); + + assert!( + sources_seen.contains("vlprod") && sources_seen.contains("vldev"), + "per-source default throttle buckets must be isolated; saw: {:?} (alerts: {})", + sources_seen, + alerts.len() + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn multi_source_event_field_named_vl_source_is_masked_by_synthetic() { + // If an event literally carries a `vl_source` field, the synthetic + // value wins in layer 1 and in the AlertPayload (collision policy + // matches rule_name, v1.2.1). + let server = MockServer::start().await; + + // Build an event with a hostile literal vl_source value. Use a minimal + // VL shape: _time, _stream, _msg, plus the collision field. + let hostile: Value = serde_json::json!({ + "_time": "2026-04-15T10:00:00Z", + "_stream": "{}", + "_msg": "hostile", + "vl_source": "evil" + }); + mount_ndjson(&server, &[&hostile]).await; + + let mut sources = BTreeMap::new(); + sources.insert("real_source".to_string(), vl_source(&server.uri())); + + let rules = vec![rule("collision", Vec::new())]; + + let queue = NotificationQueue::new(64); + let mut rx = queue.subscribe(); + let engine = RuleEngine::new( + runtime(sources, rules), + reqwest::Client::new(), + queue.clone(), + ); + + let cancel = CancellationToken::new(); + let cancel_clone = cancel.clone(); + let handle = tokio::spawn(async move { engine.run(cancel_clone).await }); + + let alerts = drain_from(&mut rx, 1, Duration::from_secs(3)).await; + cancel.cancel(); + let _ = tokio::time::timeout(Duration::from_secs(1), handle).await; + + assert!(!alerts.is_empty(), "expected at least one alert"); + for a in &alerts { + assert_eq!( + a.vl_source, "real_source", + "AlertPayload.vl_source must be synthetic, not event-literal 'evil'" + ); + assert!( + a.message.title.contains("real_source"), + "layer 1 title must show synthetic vl_source, got: {}", + a.message.title + ); + assert!( + !a.message.title.contains("evil"), + "title must not leak event-literal 'evil' into rendered output: {}", + a.message.title + ); + } +} + +#[tokio::test] +async fn template_engine_renders_vl_source_directly_without_http() { + // Smoke test the template-level contract independently of the engine + // so that if the integration tests above time out in CI under load we + // still have a fast unit-level guarantee that vl_source threads through + // layer 1. This also doubles as an assertion that the fixture corpus is + // consumable from template rendering (guards against future shape drift). + let mut templates = std::collections::HashMap::new(); + templates.insert( + "tpl".to_string(), + CompiledTemplate { + title: "[{{ vl_source }}] {{ rule_name }} {{ _msg }}".to_string(), + body: "b".to_string(), + email_body_html: None, + accent_color: None, + }, + ); + let engine = Arc::new(TemplateEngine::new(templates)); + + let ev = load_fixture("nginx_http_400.json"); + let rendered = engine.render_with_fallback("tpl", &ev, "nginx_rule", "vlprod"); + + assert!( + rendered.title.starts_with("[vlprod] nginx_rule "), + "expected title to include synthetic vl_source, got: {}", + rendered.title + ); +} diff --git a/tests/smtp_integration.rs b/tests/smtp_integration.rs index fd25937..d35c7c3 100644 --- a/tests/smtp_integration.rs +++ b/tests/smtp_integration.rs @@ -217,6 +217,7 @@ fn make_alert_payload(rule_name: &str, title: &str, body: &str) -> AlertPayload accent_color: Some("#ff0000".to_string()), }, rule_name: rule_name.to_string(), + vl_source: "vlprod".to_string(), destinations: vec![], log_timestamp: "2026-01-15T10:49:35.799Z".to_string(), log_timestamp_formatted: "15/01/2026 10:49:35 UTC".to_string(), @@ -404,6 +405,7 @@ async fn test_send_email_html_format() { accent_color: Some("#ff0000".to_string()), }, rule_name: "html_rule".to_string(), + vl_source: "vlprod".to_string(), destinations: vec![], log_timestamp: "2026-01-15T10:49:35.799Z".to_string(), log_timestamp_formatted: "15/01/2026 10:49:35 UTC".to_string(), diff --git a/tests/template_against_vl_fixtures.rs b/tests/template_against_vl_fixtures.rs new file mode 100644 index 0000000..a6a846a --- /dev/null +++ b/tests/template_against_vl_fixtures.rs @@ -0,0 +1,160 @@ +//! Parameterised template × fixture integration tests. +//! +//! Demonstrates how the corpus plugs into the `TemplateEngine` and catches +//! regressions that motivated the chore: +//! - #25: `{{ nginx.http.request_id }}` against flat-dotted keys. +//! - empty-field rendering against `edge_empty_fields.json`. +//! +//! The existing inline-event tests (e.g. `integration_notify.rs`) are left +//! untouched; this file is purely additive. + +use std::collections::HashMap; + +use valerter::config::CompiledTemplate; +use valerter::template::TemplateEngine; + +mod common; + +use common::vl_events::{ + all_fixtures, load_fixture, load_fixtures_by_source_system, load_fixtures_by_tag, +}; + +/// Build a single-template engine wired to `{{ title_tpl }}` / `{{ body_tpl }}`. +fn engine_with(title_tpl: &str, body_tpl: &str) -> TemplateEngine { + let mut templates = HashMap::new(); + templates.insert( + "t".to_string(), + CompiledTemplate { + title: title_tpl.to_string(), + body: body_tpl.to_string(), + email_body_html: None, + accent_color: None, + }, + ); + TemplateEngine::new(templates) +} + +#[test] +fn smoke_every_fixture_renders_msg_and_time_matching_event() { + // For every fixture, render `{{ _msg }} @ {{ _time }}` and assert the + // output matches what the event literally carries. Stronger than a bare + // `!is_empty()` assertion: catches silent drift if templating or unflat- + // tening starts mangling top-level fields. + let engine = engine_with("{{ _msg }}", "{{ _msg }} @ {{ _time }}"); + for (name, value) in all_fixtures() { + let msg = value + .as_object() + .and_then(|o| o.get("_msg")) + .and_then(|v| v.as_str()) + .unwrap_or_else(|| panic!("fixture {} missing string _msg", name)); + let time = value + .as_object() + .and_then(|o| o.get("_time")) + .and_then(|v| v.as_str()) + .unwrap_or_else(|| panic!("fixture {} missing string _time", name)); + let expected = format!("{} @ {}", msg, time); + let rendered = engine + .render("t", &value, "smoke", "vlprod") + .unwrap_or_else(|e| panic!("fixture {} failed to render: {}", name, e)); + assert_eq!( + rendered.body, expected, + "fixture {} rendered body did not match the event's _msg/_time", + name + ); + assert_eq!( + rendered.title, msg, + "fixture {} rendered title drifted", + name + ); + } +} + +#[test] +fn regression_gh25_dotted_keys_render_their_value() { + // Regression guard for #25: `{{ nginx.http.request_id }}` must render + // the flat-dotted value from the event. Before the fix, minijinja + // treated `nginx.http.request_id` as nested attribute lookup and the + // expression resolved to empty under Lenient undefined behaviour. + let engine = engine_with("req", "{{ nginx.http.request_id }}"); + let hits = load_fixtures_by_tag("dotted_keys"); + assert!( + !hits.is_empty(), + "expected at least one `dotted_keys` fixture for #25 regression" + ); + let mut checked_any = false; + for (name, value) in hits { + // Skip fixtures that don't carry the specific dotted key we test. + let expected = value + .as_object() + .and_then(|o| o.get("nginx.http.request_id")) + .and_then(|v| v.as_str()); + let Some(expected) = expected else { + continue; + }; + let rendered = engine + .render("t", &value, "gh25", "vlprod") + .unwrap_or_else(|e| panic!("fixture {} failed to render for #25: {}", name, e)); + assert_eq!( + rendered.body, expected, + "fixture {} did not surface nginx.http.request_id via template", + name + ); + checked_any = true; + } + assert!( + checked_any, + "no dotted_keys fixture carried `nginx.http.request_id`; add one or \ + retag the existing nginx fixtures" + ); +} + +#[test] +fn regression_empty_fields_render_as_empty_string() { + // `edge_empty_fields.json` carries `request_id: ""`. Templates that + // reference these must render to empty string (not fail, not produce + // `"None"`, etc.). This is the contract the empty-guard in #26 relies on. + let engine = engine_with("t", "[{{ request_id }}][{{ user_id }}][{{ error }}]"); + let event = load_fixture("edge_empty_fields.json"); + let rendered = engine + .render("t", &event, "empty_fields", "vlprod") + .expect("render should succeed with lenient undefined"); + assert_eq!(rendered.body, "[][][]"); +} + +#[test] +fn raw_source_fixtures_render_missing_as_empty_under_lenient() { + // Fixtures whose manifest source_system is `raw` carry no structured + // fields beyond the VL envelope. Rendering `{{ _msg }}` must still + // work, and rendering `{{ missing_field }}` must yield empty under + // Lenient (not error). This protects against accidental Strict flips. + let engine = engine_with("{{ _msg }}", "{{ unknown_field_that_does_not_exist }}"); + let fixtures = load_fixtures_by_source_system("raw"); + assert!( + !fixtures.is_empty(), + "expected at least one fixture with source_system: raw" + ); + for (name, value) in fixtures { + let rendered = engine + .render("t", &value, "raw", "vlprod") + .unwrap_or_else(|e| panic!("fixture {} failed lenient render: {}", name, e)); + assert!(!rendered.title.is_empty(), "title empty for {}", name); + assert!( + rendered.body.is_empty(), + "body should be empty string for missing field (fixture {})", + name + ); + } +} + +#[test] +fn unicode_fixture_preserves_cjk_and_emoji() { + // Render the unicode fixture via `{{ _msg }}` and check the raw bytes + // survive. This catches encoding bugs in the template pipeline. + let engine = engine_with("{{ _msg }}", "{{ _msg }}"); + let event = load_fixture("edge_unicode_msg.json"); + let rendered = engine + .render("t", &event, "unicode", "vlprod") + .expect("render should succeed on unicode event"); + assert!(rendered.body.contains("支付失败"), "lost CJK codepoints"); + assert!(rendered.body.contains('\u{2705}'), "lost emoji codepoint"); +} diff --git a/tests/vl_fixtures_consistency.rs b/tests/vl_fixtures_consistency.rs new file mode 100644 index 0000000..02f9674 --- /dev/null +++ b/tests/vl_fixtures_consistency.rs @@ -0,0 +1,164 @@ +//! Consistency checks between `tests/fixtures/vl_events/index.yaml` and the +//! filesystem. This is the gate that keeps the corpus honest: no orphaned +//! files, no phantom entries, every fixture parses, every fixture carries the +//! VL-mandatory fields. + +use std::collections::BTreeSet; + +mod common; + +use common::vl_events::{ + all_fixtures, filesystem_fixtures, load_fixtures_by_tag, manifest_entries, +}; + +/// Required top-level fields on every `/select/logsql/tail` event. +const REQUIRED_FIELDS: &[&str] = &["_msg", "_time", "_stream"]; + +#[test] +fn corpus_contains_at_least_fifteen_fixtures() { + let count = manifest_entries().len(); + assert!( + count >= 15, + "spec requires at least 15 fixtures, found {}", + count + ); +} + +#[test] +fn index_and_filesystem_agree() { + let index_names: BTreeSet = manifest_entries().keys().cloned().collect(); + let disk_names: BTreeSet = filesystem_fixtures().into_iter().collect(); + + let orphans: Vec<&String> = disk_names.difference(&index_names).collect(); + let phantoms: Vec<&String> = index_names.difference(&disk_names).collect(); + + assert!( + orphans.is_empty(), + "orphan fixtures on disk without an index.yaml entry: {:?}. \ + Either add them to tests/fixtures/vl_events/index.yaml or delete them.", + orphans + ); + assert!( + phantoms.is_empty(), + "phantom fixtures declared in index.yaml but missing on disk: {:?}. \ + Either create the file or remove the entry from index.yaml.", + phantoms + ); +} + +#[test] +fn every_fixture_parses_as_json() { + // `all_fixtures()` already panics on parse error; this test just exercises + // every file so CI surfaces the panic with a clear path. + let loaded = all_fixtures(); + assert!(!loaded.is_empty(), "no fixtures discovered"); + for (name, value) in &loaded { + assert!( + value.is_object(), + "fixture {} parsed but is not a JSON object (got {})", + name, + match value { + serde_json::Value::Null => "null", + serde_json::Value::Bool(_) => "bool", + serde_json::Value::Number(_) => "number", + serde_json::Value::String(_) => "string", + serde_json::Value::Array(_) => "array", + serde_json::Value::Object(_) => "object", + } + ); + } +} + +#[test] +fn every_fixture_is_single_line_json() { + // VL's /select/logsql/tail emits one event per line; the corpus must + // mirror that shape so consumers can copy-paste fixtures into mocks. + let dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/fixtures/vl_events"); + for name in filesystem_fixtures() { + let path = dir.join(&name); + let raw = std::fs::read_to_string(&path) + .unwrap_or_else(|e| panic!("failed to read {}: {}", path.display(), e)); + // Allow a single trailing newline (POSIX convention) but no embedded + // newlines: the JSON payload itself must be on one line. + let trimmed = raw.strip_suffix('\n').unwrap_or(&raw); + assert!( + !trimmed.contains('\n'), + "fixture {} spans multiple lines. VL tail output is one JSON per \ + line; fixtures must match.", + name + ); + } +} + +#[test] +fn every_fixture_has_required_vl_fields() { + // Self-contained: do not assume parses_as_json ran first. `cargo test` + // parallelises tests by default, so cross-test dependencies via panic + // messages are unreliable. + for (name, value) in all_fixtures() { + let obj = value.as_object().unwrap_or_else(|| { + panic!( + "fixture {} is not a top-level JSON object (see also parses_as_json test)", + name + ) + }); + for field in REQUIRED_FIELDS { + assert!( + obj.contains_key(*field), + "fixture {} is missing required VL field `{}`", + name, + field + ); + } + // _msg must be a non-empty string so the smoke test can render it. + let msg = obj + .get("_msg") + .and_then(|v| v.as_str()) + .unwrap_or_else(|| panic!("fixture {} has non-string _msg", name)); + assert!( + !msg.is_empty(), + "fixture {} has empty _msg (not useful for template smoke tests)", + name + ); + } +} + +#[test] +fn every_entry_has_description_source_and_tags() { + for (name, entry) in manifest_entries() { + assert!( + !entry.description.trim().is_empty(), + "fixture {} has an empty description", + name + ); + assert!( + !entry.source_system.trim().is_empty(), + "fixture {} has an empty source_system", + name + ); + assert!( + !entry.tags.is_empty(), + "fixture {} has no tags (require at least one for discoverability)", + name + ); + } +} + +#[test] +fn tag_lookup_returns_empty_for_unknown() { + // Contract from the spec: unknown tags yield an empty Vec, caller asserts. + let hits = load_fixtures_by_tag("this_tag_does_not_exist_anywhere_xyz"); + assert!(hits.is_empty(), "expected empty result for unknown tag"); +} + +#[test] +fn dotted_keys_tag_is_populated() { + // The corpus exists primarily to catch regressions like #25. If this + // tag ever ends up empty, the smoke test below becomes a no-op. + let hits = load_fixtures_by_tag("dotted_keys"); + assert!( + !hits.is_empty(), + "no fixtures tagged `dotted_keys` — the regression harness for #25 \ + would silently skip" + ); +}