diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f71326..108dc87 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,92 @@ # Changelog +## v0.3.0 — Metadata Namespace Redesign (BREAKING) + +### Breaking changes + +- Top-level `ads:` block is removed. Move it under `metadata.ads:`. +- Top-level flat `metadata:` with maturity fields at root is removed. Nest + fields under `metadata.maturity:`. +- All seven resource types now share a reserved top-level `metadata:` + namespace that is stripped from API payloads and content hashes. Third-party + frameworks can add sub-namespaces (e.g. `metadata.:`) with a + guarantee they will never leak to the CrowdStrike/Humio API. + +### Migration (before → after) + +Before: + +```yaml +resource_id: my_detection +name: "My Detection" +ads: + goal: "Detect something" + mitre_attack: ["TA0011:T1090.003"] +metadata: + created: "2026-04-14" + last_tuned: "2026-04-16" + tune_count: 2 + confidence: high +severity: 50 +search: { ... } +``` + +After: + +```yaml +resource_id: my_detection +name: "My Detection" +metadata: + maturity: + created: "2026-04-14" + last_tuned: "2026-04-16" + tune_count: 2 + confidence: high + ads: + goal: "Detect something" + mitre_attack: ["TA0011:T1090.003"] +severity: 50 +search: { ... } +``` + +Migration rules (mechanical): + +- Top-level `ads:` → `metadata.ads:`. +- Top-level flat `metadata:` with `created` / `last_tuned` / `tune_count` / + `confidence` at root → `metadata.maturity:` with the same fields inside. +- If both were present, merge into a single `metadata:` with `maturity:` and + `ads:` sub-namespaces. + +No `talonctl migrate` command ships — the corpus is small enough to hand-edit +or rewrite with an editor or assistant. + +### Fixed + +- Dashboard apply no longer leaks `_template_path` into the Humio schema + validation payload (closes issue #7). The stripping fix is shared across + all resource types via `core/template_sanitizer` (shipped in v0.2.1) and + is now exercised by every provider's payload/hash path. +- Reference templates `examples/resources/lookup_file.yaml`, + `rtr_put_file.yaml`, and `workflow.yaml` had pre-existing schema bugs + (missing required fields, wrong field names, broken file paths). These + are fixed and now exercised by a new parity test. + +### Added + +- `metadata.maturity:` validation now runs on every resource type, not just + detections. Schema: `created` (YYYY-MM-DD), `last_tuned` (YYYY-MM-DD or + null), `tune_count` (non-negative int), `confidence` (low/medium/high/ + validated). All four fields optional. +- `metadata.ads:` remains detection-only. Non-detection providers reject it + with a clear error pointing at the resource type. +- Third-party frameworks may add arbitrary sub-namespaces under `metadata.` + (e.g. `metadata.acme_corp:`). These are stripped from API payloads and + ignored by talonctl; users supply their own validators if needed. +- `tests/unit/test_examples.py` parity test ensures reference YAMLs under + `examples/resources/` do not drift out of spec. +- `tests/unit/test_old_shape_rejection.py` locks every provider's migration + pointer error string so a silent refactor can never drop it. + ## v0.2.1 — 2026-04-16 ### Fixed diff --git a/examples/resources/README.md b/examples/resources/README.md index 2641f3a..0ebce9b 100644 --- a/examples/resources/README.md +++ b/examples/resources/README.md @@ -27,4 +27,39 @@ python scripts/resource_deploy.py plan --resources=detection - **`name`** — display name in Falcon console, can be updated freely - **`_search_domain`** — which NGSIEM search domain the resource belongs to +## The `metadata:` namespace + +Every resource type (detection, saved_search, dashboard, workflow, lookup_file, +rtr_script, rtr_put_file) supports an optional top-level `metadata:` block that +is **always stripped from the API payload and content hash**. Editing anything +under `metadata:` produces zero output in `talonctl plan`. + +Two sub-namespaces are validated out-of-the-box: + +- **`metadata.maturity:`** (universal) — track created date, last tuned date, + tune count, and confidence level. Optional fields: + - `created`: ISO date `YYYY-MM-DD` + - `last_tuned`: ISO date `YYYY-MM-DD` or `null` (never tuned) + - `tune_count`: non-negative integer + - `confidence`: one of `low`, `medium`, `high`, `validated` + +- **`metadata.ads:`** (detection-only) — Palantir Alerting & Detection Strategy + documentation. Schema: see `examples/resources/detection.yaml` for the + annotated reference. + +Third-party frameworks may add their own sub-namespaces under `metadata.`: + +```yaml +metadata: + maturity: + created: "2026-04-16" + my_framework: + anything: true + custom_refs: ["path/to/file.md#anchor"] +``` + +Talonctl does not validate third-party sub-namespaces — users supply their own +validators if needed. The guarantee talonctl makes is that `metadata.` +never leaks to the API and never affects the content hash. + See the main README and CLAUDE.md for full documentation. diff --git a/examples/resources/_assets/sysmonconfig-export.xml b/examples/resources/_assets/sysmonconfig-export.xml new file mode 100644 index 0000000..21bb891 --- /dev/null +++ b/examples/resources/_assets/sysmonconfig-export.xml @@ -0,0 +1,5 @@ + + + + diff --git a/examples/resources/_assets/tor_exit_nodes.csv b/examples/resources/_assets/tor_exit_nodes.csv new file mode 100644 index 0000000..83285ac --- /dev/null +++ b/examples/resources/_assets/tor_exit_nodes.csv @@ -0,0 +1,3 @@ +ip +203.0.113.1 +203.0.113.2 diff --git a/examples/resources/detection.yaml b/examples/resources/detection.yaml index d84fc97..d72e06b 100644 --- a/examples/resources/detection.yaml +++ b/examples/resources/detection.yaml @@ -2,16 +2,19 @@ # Detects TOR traffic from internal networks to the internet. # This is a real, deployable detection — 100% generic, no tenant-specific content. # -# Optional top-level blocks demonstrated below: -# - metadata: detection maturity tracking (created, last_tuned, tune_count, confidence). -# All four fields optional when the block is present. Ignored by the CrowdStrike -# API and excluded from compute_content_hash — editing any metadata field produces -# zero output in `talonctl plan`. -# - ads: Alerting & Detection Strategy documentation. Optional; when present, `goal` -# is required. The fields `false_positives`, `response`, and `validation` each -# accept either inline content or a `{path, label?}` ref dict pointing at a -# knowledge file (e.g. knowledge/patterns/.md#anchor). Both the inline -# and ref-dict forms are shown below. +# The top-level `metadata:` block (new in v0.3.0) is reserved, stripped from API +# payloads, and excluded from compute_content_hash. Editing anything under +# `metadata:` produces zero output in `talonctl plan`. Two sub-namespaces are +# validated out-of-the-box: +# - metadata.maturity: detection maturity tracking — created, last_tuned, +# tune_count, confidence. All four fields optional. +# - metadata.ads: Alerting & Detection Strategy documentation (detection-only). +# When present, `goal` is required. The fields `false_positives`, `response`, +# and `validation` each accept either inline content or a `{path, label?}` ref +# dict pointing at a knowledge file (e.g. knowledge/patterns/.md#anchor). +# Both the inline and ref-dict forms are shown below. +# Third-party frameworks may add their own sub-namespaces under `metadata.` — +# talonctl will strip them from the API and ignore them. resource_id: generic___network___tor_traffic_to_the_internet name: Generic - Network - TOR Traffic to the Internet @@ -22,48 +25,49 @@ severity: 50 status: active mitre_attack: ["TA0011:T1090.003"] metadata: - created: "2026-04-14" - last_tuned: "2026-04-16" - tune_count: 2 - confidence: high -ads: - goal: > - Detect traffic associated with TOR anonymization network from internal - networks, which may indicate an adversary attempting to evade network - monitoring or exfiltrate data through encrypted channels. - mitre_attack: - - "Command and Control / Proxy: Multi-hop Proxy (T1090.003)" - strategy_abstract: > - Monitors network flow logs for TCP connections to known TOR ports - (9001, 9030, 9040, 9050, 9051, 9150) from internal RFC1918 addresses, - plus port 443 with TOR application identification. - technical_context: > - Requires network flow logs from supported vendors (Akamai, AWS, Cisco, - Cloudflare, Corelight, etc.) with destination port and application fields. - Uses CIDR matching to identify internal-to-external traffic only. - blind_spots: - - "TOR over non-standard ports not covered" - - "TOR bridges using obfs4 transport may not be identified" - - "Vendors not in the filter list are excluded" - false_positives: - - pattern: "Security tool TOR exit node scanning" - characteristics: "Automated security scanner source IPs" - tuning: "Add scanner IPs to exclusion list" - status: "open" - # Ref-form alternative — points at the canonical pattern doc in the user - # project's knowledge/ directory. Useful when the same FP pattern applies - # across multiple detections and lives in a shared pattern file. - - path: "knowledge/patterns/network.md#tor-bridges-obfs4" - label: "TOR bridge traffic via obfs4 transport" - validation: - - "Generate TOR traffic from an internal IP to port 9050" - - "Verify alert fires with correct source/destination fields" - priority_rationale: > - Medium severity (50) — TOR usage is suspicious but may have legitimate - privacy use cases. Requires analyst investigation to determine intent. - response: "Investigate user, check for data exfiltration, review browsing history" - ads_created: "2026-04-14" - ads_author: "talonctl example" + maturity: + created: "2026-04-14" + last_tuned: "2026-04-16" + tune_count: 2 + confidence: high + ads: + goal: > + Detect traffic associated with TOR anonymization network from internal + networks, which may indicate an adversary attempting to evade network + monitoring or exfiltrate data through encrypted channels. + mitre_attack: + - "Command and Control / Proxy: Multi-hop Proxy (T1090.003)" + strategy_abstract: > + Monitors network flow logs for TCP connections to known TOR ports + (9001, 9030, 9040, 9050, 9051, 9150) from internal RFC1918 addresses, + plus port 443 with TOR application identification. + technical_context: > + Requires network flow logs from supported vendors (Akamai, AWS, Cisco, + Cloudflare, Corelight, etc.) with destination port and application fields. + Uses CIDR matching to identify internal-to-external traffic only. + blind_spots: + - "TOR over non-standard ports not covered" + - "TOR bridges using obfs4 transport may not be identified" + - "Vendors not in the filter list are excluded" + false_positives: + - pattern: "Security tool TOR exit node scanning" + characteristics: "Automated security scanner source IPs" + tuning: "Add scanner IPs to exclusion list" + status: "open" + # Ref-form alternative — points at the canonical pattern doc in the user + # project's knowledge/ directory. Useful when the same FP pattern applies + # across multiple detections and lives in a shared pattern file. + - path: "knowledge/patterns/network.md#tor-bridges-obfs4" + label: "TOR bridge traffic via obfs4 transport" + validation: + - "Generate TOR traffic from an internal IP to port 9050" + - "Verify alert fires with correct source/destination fields" + priority_rationale: > + Medium severity (50) — TOR usage is suspicious but may have legitimate + privacy use cases. Requires analyst investigation to determine intent. + response: "Investigate user, check for data exfiltration, review browsing history" + ads_created: "2026-04-14" + ads_author: "talonctl example" search: filter: | #repo!=xdr_* diff --git a/examples/resources/lookup_file.yaml b/examples/resources/lookup_file.yaml index 37a536b..5bfc010 100644 --- a/examples/resources/lookup_file.yaml +++ b/examples/resources/lookup_file.yaml @@ -12,5 +12,5 @@ description: | | match(file="tor_exit_nodes.csv", field=RemoteAddressIP4, column="ip", strict=false) Source: Query-Hub (open-source review) format: csv -source: resources/lookup_files/crowdstrike/tor_exit_nodes.csv +source: examples/resources/_assets/tor_exit_nodes.csv _search_domain: all diff --git a/examples/resources/rtr_put_file.yaml b/examples/resources/rtr_put_file.yaml index e8f4f38..974fb8a 100644 --- a/examples/resources/rtr_put_file.yaml +++ b/examples/resources/rtr_put_file.yaml @@ -9,5 +9,4 @@ name: sysmonconfig-export.xml description: | Sysmon configuration file for enhanced endpoint telemetry. Pushed to endpoints during IR setup or baseline hardening. -source: resources/rtr_put_files/sysmonconfig-export.xml -file_type: config +file_path: _assets/sysmonconfig-export.xml diff --git a/examples/resources/workflow.yaml b/examples/resources/workflow.yaml index 9a711f3..a79db30 100644 --- a/examples/resources/workflow.yaml +++ b/examples/resources/workflow.yaml @@ -10,20 +10,20 @@ description: | Sends a notification when a critical-severity detection fires. Trigger: detection with severity >= 70. Action: Posts to a webhook endpoint. -definition: - # The definition field contains the Falcon Fusion workflow JSON. - # Export an existing workflow from the Falcon console to get the full structure, - # or use the fusion-workflows skill to build one from scratch. - # - # Example structure (simplified): - # { - # "trigger": { "type": "detection", "conditions": { "severity": { "gte": 70 } } }, - # "actions": [ - # { "type": "webhook", "config": { "url": "https://hooks.example.com/alerts" } } - # ] - # } +enabled: true trigger: - type: detection + event: Investigatable/NGSIEM + type: Signal + conditions: + severity: + gte: 70 +actions: + webhook_notify: + id: generic_webhook + name: Post to webhook + properties: + url: https://hooks.example.com/alerts +conditions: {} tags: - notification - critical diff --git a/src/talonctl/core/metadata_validators.py b/src/talonctl/core/metadata_validators.py new file mode 100644 index 0000000..bf55559 --- /dev/null +++ b/src/talonctl/core/metadata_validators.py @@ -0,0 +1,106 @@ +"""Shared metadata-block validators used by every provider's validate_template. + +This module owns the universal metadata.maturity schema. Per-resource-type +validators (e.g. metadata.ads for detections) live in the provider that owns +that namespace — they are not shared here. +""" + +from __future__ import annotations + +import re +from typing import Any, Dict, List + +_DATE_PATTERN = re.compile(r"^\d{4}-\d{2}-\d{2}$") + +MATURITY_ALLOWED_FIELDS = frozenset({"created", "last_tuned", "tune_count", "confidence"}) +MATURITY_DATE_FIELDS = frozenset({"created", "last_tuned"}) +MATURITY_CONFIDENCE_VALUES = frozenset({"low", "medium", "high", "validated"}) + + +def validate_maturity(template: Dict[str, Any]) -> List[str]: + """Validate `template["metadata"]["maturity"]` if present. Return error list. + + Empty list when the block is absent or valid. All four maturity fields are + optional when the maturity block itself is present. Errors accumulate — this + function does NOT short-circuit. + """ + errors: List[str] = [] + metadata = template.get("metadata") + if metadata is None: + return errors + + if not isinstance(metadata, dict): + errors.append("'metadata' must be a dictionary") + return errors + + maturity = metadata.get("maturity") + if maturity is None: + return errors + + if not isinstance(maturity, dict): + errors.append("'metadata.maturity' must be a dictionary") + return errors + + unknown = set(maturity.keys()) - MATURITY_ALLOWED_FIELDS + if unknown: + known = ", ".join(sorted(MATURITY_ALLOWED_FIELDS)) + errors.append(f"Unknown metadata.maturity key(s): {', '.join(sorted(unknown))}. Known keys: {known}") + + for field in MATURITY_DATE_FIELDS: + if field not in maturity: + continue + val = maturity[field] + if field == "last_tuned" and val is None: + continue + if not isinstance(val, str) or not _DATE_PATTERN.match(val): + suffix = " or null" if field == "last_tuned" else "" + errors.append(f"metadata.maturity.{field} must be YYYY-MM-DD date{suffix} (got {val!r})") + + if "tune_count" in maturity: + val = maturity["tune_count"] + # bool is a subclass of int — reject it explicitly. + if isinstance(val, bool) or not isinstance(val, int) or val < 0: + errors.append(f"metadata.maturity.tune_count must be a non-negative integer (got {val!r})") + + if "confidence" in maturity: + val = maturity["confidence"] + if val not in MATURITY_CONFIDENCE_VALUES: + allowed = ", ".join(["low", "medium", "high", "validated"]) + errors.append(f"metadata.maturity.confidence must be one of: {allowed} (got {val!r})") + + return errors + + +def reject_old_shape(template: Dict[str, Any]) -> List[str]: + """Return errors for pre-v0.3.0 top-level shapes. + + Old shapes rejected: + - top-level `ads:` (use metadata.ads instead) + - top-level flat `metadata:` with maturity fields as direct children + (use metadata.maturity. instead) + + Empty list on clean (new-shape) input. Called FIRST in every provider's + validate_template so users see the migration pointer rather than a cascade + of format errors from the relocated validators. + """ + errors: List[str] = [] + + if "ads" in template: + errors.append( + "Top-level 'ads:' is removed in v0.3.0. Move it under 'metadata.ads' " + "in your YAML template. See CHANGELOG.md (v0.3.0) for a before/after example." + ) + + metadata = template.get("metadata") + if isinstance(metadata, dict): + flat_fields_at_root = set(metadata.keys()) & MATURITY_ALLOWED_FIELDS + if flat_fields_at_root: + errors.append( + "Top-level 'metadata:' now reserves sub-namespaces; maturity fields " + "must be nested. Move " + f"{', '.join(repr(f) for f in sorted(flat_fields_at_root))} " + "under 'metadata.maturity'. See CHANGELOG.md (v0.3.0) for a " + "before/after example." + ) + + return errors diff --git a/src/talonctl/providers/dashboard_provider.py b/src/talonctl/providers/dashboard_provider.py index cd3d897..7aa89b4 100644 --- a/src/talonctl/providers/dashboard_provider.py +++ b/src/talonctl/providers/dashboard_provider.py @@ -17,6 +17,7 @@ import yaml from talonctl.core.base_provider import BaseResourceProvider, ResourceChange, ResourceAction +from talonctl.core.metadata_validators import reject_old_shape, validate_maturity from talonctl.core.template_sanitizer import strip_for_api, strip_for_hash logger = logging.getLogger(__name__) @@ -49,6 +50,15 @@ def get_resource_type(self) -> str: def validate_template(self, template: Dict[str, Any]) -> List[str]: errors = [] + # v0.3.0: reject pre-v0.3.0 shapes and validate metadata.maturity universally. + errors.extend(reject_old_shape(template)) + errors.extend(validate_maturity(template)) + + # metadata.ads is detection-only; flag on this provider. + metadata_block = template.get("metadata") + if isinstance(metadata_block, dict) and "ads" in metadata_block: + errors.append("metadata.ads is only supported on detection resources (this is a dashboard template)") + for field in ("resource_id", "name", "sections", "widgets"): if field not in template or not template[field]: errors.append(f"Required field '{field}' is missing or empty") diff --git a/src/talonctl/providers/detection_provider.py b/src/talonctl/providers/detection_provider.py index d3a24a6..bb7ca8b 100644 --- a/src/talonctl/providers/detection_provider.py +++ b/src/talonctl/providers/detection_provider.py @@ -7,13 +7,14 @@ import json import hashlib -import re import logging from typing import Dict, List, Optional, Any, Tuple from datetime import datetime, timezone from talonctl.core.base_provider import BaseResourceProvider, ResourceAction, ResourceChange from talonctl.core.deployment_strategies import DeploymentStrategyFactory +from talonctl.core.metadata_validators import reject_old_shape, validate_maturity +from talonctl.core.template_sanitizer import strip_for_api, strip_for_hash from talonctl.utils.mitre_processor import MitreProcessor logger = logging.getLogger(__name__) @@ -105,6 +106,13 @@ def validate_template(self, template: Dict[str, Any]) -> List[str]: """ errors = [] + # v0.3.0: reject pre-v0.3.0 shapes before running relocated validators so users + # see the migration pointer instead of a cascade of format errors. + errors.extend(reject_old_shape(template)) + + # v0.3.0: universal maturity validation (reads template["metadata"]["maturity"]). + errors.extend(validate_maturity(template)) + # Required fields required_fields = ["name", "description", "severity", "search"] for field in required_fields: @@ -180,12 +188,14 @@ def validate_template(self, template: Dict[str, Any]) -> List[str]: # Legacy format: top-level tactic/technique fields are also valid # No validation needed - these are optional - # Validate ADS metadata if present (optional block, strict when present). - # Schema: docs/superpowers/specs/2026-04-16-metadata-schema-and-ads-refs-design.md §2. - ads = template.get("ads") + # v0.3.0: ADS metadata relocated to template["metadata"]["ads"] (was top-level ads:). + # Internal shape rules (goal required, ref-dict vs inline-FP, etc.) unchanged. + # Schema: docs/superpowers/specs/2026-04-16-metadata-namespace-redesign-design.md. + metadata_block = template.get("metadata") + ads = metadata_block.get("ads") if isinstance(metadata_block, dict) else None if ads is not None: if not isinstance(ads, dict): - errors.append("'ads' must be a dictionary") + errors.append("'metadata.ads' must be a dictionary") else: # Required fields (goal) when ads: is present. for field in self.ADS_REQUIRED_FIELDS: @@ -263,45 +273,6 @@ def validate_template(self, template: Dict[str, Any]) -> List[str]: else: errors.append("ads.response must be a string or ref dict ({path, label?})") - # Validate metadata: block if present (optional block, strict when present). - # See docs/superpowers/specs/2026-04-16-metadata-schema-and-ads-refs-design.md §1. - metadata = template.get("metadata") - if metadata is not None: - if not isinstance(metadata, dict): - errors.append("'metadata' must be a dictionary") - else: - # Reject unknown keys with typo-friendly error listing known keys. - unknown = set(metadata.keys()) - self.METADATA_ALLOWED_FIELDS - if unknown: - known = ", ".join(sorted(self.METADATA_ALLOWED_FIELDS)) - errors.append(f"Unknown metadata key(s): {', '.join(sorted(unknown))}. Known keys: {known}") - - # Validate date fields (YYYY-MM-DD; last_tuned additionally allows null). - for field in self.METADATA_DATE_FIELDS: - if field not in metadata: - continue - val = metadata[field] - if field == "last_tuned" and val is None: - continue - if not isinstance(val, str) or not self._DATE_PATTERN.match(val): - errors.append( - f"metadata.{field} must be YYYY-MM-DD date" - f"{' or null' if field == 'last_tuned' else ''} (got {val!r})" - ) - - # Validate tune_count (non-negative int; bool explicitly rejected). - if "tune_count" in metadata: - val = metadata["tune_count"] - if isinstance(val, bool) or not isinstance(val, int) or val < 0: - errors.append(f"metadata.tune_count must be a non-negative integer (got {val!r})") - - # Validate confidence enum. - if "confidence" in metadata: - val = metadata["confidence"] - if val not in self.METADATA_CONFIDENCE_VALUES: - allowed = ", ".join(sorted(self.METADATA_CONFIDENCE_VALUES)) - errors.append(f"metadata.confidence must be one of: {allowed} (got {val!r})") - return errors def fetch_remote_state(self, resource_id: str) -> Optional[Dict[str, Any]]: @@ -823,6 +794,8 @@ def _prepare_rule_payload(self, template: Dict[str, Any]) -> Dict[str, Any]: Supports both legacy and new template formats. The CrowdStrike API accepts the search config as-is, so we pass it through. """ + # v0.3.0: single source of truth for reserved/internal key stripping. + template = strip_for_api(template) payload = { "name": template["name"], "description": template.get("description", ""), @@ -872,6 +845,8 @@ def _prepare_patch_payload(self, template: Dict[str, Any]) -> Dict[str, Any]: This method creates a payload suitable for entities_rules_patch_v1 endpoint. """ + # v0.3.0: single source of truth for reserved/internal key stripping. + template = strip_for_api(template) payload = { "description": template.get("description", ""), "severity": template.get("severity", 50), @@ -932,15 +907,7 @@ def _prepare_patch_payload(self, template: Dict[str, Any]) -> Dict[str, Any]: "ads_author", } - # Detection maturity metadata — optional block on detection templates. - # Strict validation when present (mirrors the ads: pattern). Ignored by - # the CrowdStrike API and excluded from compute_content_hash because the - # hash uses CONTENT_FIELDS/SEARCH_FIELDS allowlists. - METADATA_ALLOWED_FIELDS = {"created", "last_tuned", "tune_count", "confidence"} - METADATA_REQUIRED_FIELDS: set = set() # All fields optional when block present. - METADATA_CONFIDENCE_VALUES = {"low", "medium", "high", "validated"} - METADATA_DATE_FIELDS = {"created", "last_tuned"} - _DATE_PATTERN = re.compile(r"^\d{4}-\d{2}-\d{2}$") + # v0.3.0: METADATA_* constants removed — now owned by core/metadata_validators.py. # Ref-dict shape accepted by ads.false_positives / ads.response / ads.validation. # See docs/superpowers/specs/2026-04-16-metadata-schema-and-ads-refs-design.md §2. @@ -957,6 +924,10 @@ def compute_content_hash(self, template: Dict[str, Any]) -> str: Works on both template data and raw API response data by extracting the same canonical set of fields from either format. """ + # v0.3.0: strip universal IaC-only + internal fields first for consistency + # with other providers. The allowlist build below is the effective filter, + # but this call locks in the single-source-of-truth for reserved keys. + template = strip_for_hash(template) # Extract only the fields we care about normalized_content = { "name": template.get("name", ""), diff --git a/src/talonctl/providers/lookup_file_provider.py b/src/talonctl/providers/lookup_file_provider.py index 7cc0fdb..e61f095 100644 --- a/src/talonctl/providers/lookup_file_provider.py +++ b/src/talonctl/providers/lookup_file_provider.py @@ -15,6 +15,8 @@ from datetime import datetime, timezone from talonctl.core.base_provider import BaseResourceProvider, ResourceAction, ResourceChange +from talonctl.core.metadata_validators import reject_old_shape, validate_maturity +from talonctl.core.template_sanitizer import strip_for_hash logger = logging.getLogger(__name__) @@ -73,6 +75,15 @@ def validate_template(self, template: Dict[str, Any]) -> List[str]: """ errors = [] + # v0.3.0: reject pre-v0.3.0 shapes and validate metadata.maturity universally. + errors.extend(reject_old_shape(template)) + errors.extend(validate_maturity(template)) + + # metadata.ads is detection-only; flag on this provider. + metadata_block = template.get("metadata") + if isinstance(metadata_block, dict) and "ads" in metadata_block: + errors.append("metadata.ads is only supported on detection resources (this is a lookup_file template)") + # Required fields required_fields = ["name", "format", "source"] for field in required_fields: @@ -462,6 +473,8 @@ def compute_content_hash(self, template: Dict[str, Any]) -> str: Returns: SHA256 hash as hex string """ + # v0.3.0: strip universal IaC-only + internal + metadata fields first. + template = strip_for_hash(template) # Read file content source_path = template["source"] if not os.path.isabs(source_path): diff --git a/src/talonctl/providers/rtr_put_file_provider.py b/src/talonctl/providers/rtr_put_file_provider.py index a52548d..33d2a57 100644 --- a/src/talonctl/providers/rtr_put_file_provider.py +++ b/src/talonctl/providers/rtr_put_file_provider.py @@ -19,6 +19,8 @@ from falconpy import RealTimeResponseAdmin from talonctl.core.base_provider import BaseResourceProvider, ResourceAction, ResourceChange +from talonctl.core.metadata_validators import reject_old_shape, validate_maturity +from talonctl.core.template_sanitizer import strip_for_hash logger = logging.getLogger(__name__) @@ -104,6 +106,15 @@ def validate_template(self, template: Dict[str, Any]) -> List[str]: """ errors = [] + # v0.3.0: reject pre-v0.3.0 shapes and validate metadata.maturity universally. + errors.extend(reject_old_shape(template)) + errors.extend(validate_maturity(template)) + + # metadata.ads is detection-only; flag on this provider. + metadata_block = template.get("metadata") + if isinstance(metadata_block, dict) and "ads" in metadata_block: + errors.append("metadata.ads is only supported on detection resources (this is a rtr_put_file template)") + # Required fields required_fields = ["name", "description", "file_path"] for field in required_fields: @@ -404,13 +415,18 @@ def compute_content_hash(self, template: Dict[str, Any]) -> str: Returns: SHA256 hash as hex string """ + # v0.3.0: _template_path is consumed here to locate the binary file, so + # it MUST be read before strip_for_hash removes it. + template_path = template.get("_template_path", ".") + template = strip_for_hash(template) + # Calculate hash of the binary file file_path = template.get("file_path", "") file_hash = "" if file_path: try: - template_dir = Path(template.get("_template_path", ".")).parent + template_dir = Path(template_path).parent full_path = template_dir / file_path if full_path.exists(): diff --git a/src/talonctl/providers/rtr_script_provider.py b/src/talonctl/providers/rtr_script_provider.py index 9183e8a..85401cd 100644 --- a/src/talonctl/providers/rtr_script_provider.py +++ b/src/talonctl/providers/rtr_script_provider.py @@ -19,6 +19,8 @@ from falconpy import RealTimeResponseAdmin from talonctl.core.base_provider import BaseResourceProvider, ResourceAction, ResourceChange +from talonctl.core.metadata_validators import reject_old_shape, validate_maturity +from talonctl.core.template_sanitizer import strip_for_hash logger = logging.getLogger(__name__) @@ -110,6 +112,15 @@ def validate_template(self, template: Dict[str, Any]) -> List[str]: """ errors = [] + # v0.3.0: reject pre-v0.3.0 shapes and validate metadata.maturity universally. + errors.extend(reject_old_shape(template)) + errors.extend(validate_maturity(template)) + + # metadata.ads is detection-only; flag on this provider. + metadata_block = template.get("metadata") + if isinstance(metadata_block, dict) and "ads" in metadata_block: + errors.append("metadata.ads is only supported on detection resources (this is a rtr_script template)") + # Required fields required_fields = ["name", "description", "platform"] for field in required_fields: @@ -505,13 +516,18 @@ def compute_content_hash(self, template: Dict[str, Any]) -> str: Returns: SHA256 hash as hex string """ + # v0.3.0: _template_path is consumed here to resolve file-relative script + # paths, so it MUST be read before strip_for_hash removes it. + template_path = template.get("_template_path", ".") + template = strip_for_hash(template) + # Load content if file_path is used content = template.get("content", "") file_path = template.get("file_path") if file_path and not content: try: - template_dir = Path(template.get("_template_path", ".")).parent + template_dir = Path(template_path).parent full_path = template_dir / file_path if full_path.exists(): with open(full_path, "r", encoding="utf-8") as f: diff --git a/src/talonctl/providers/saved_search_provider.py b/src/talonctl/providers/saved_search_provider.py index 64be4bc..1d420cd 100644 --- a/src/talonctl/providers/saved_search_provider.py +++ b/src/talonctl/providers/saved_search_provider.py @@ -15,6 +15,8 @@ from datetime import datetime, timezone from talonctl.core.base_provider import BaseResourceProvider, ResourceAction, ResourceChange +from talonctl.core.metadata_validators import reject_old_shape, validate_maturity +from talonctl.core.template_sanitizer import strip_for_api, strip_for_hash logger = logging.getLogger(__name__) @@ -70,6 +72,15 @@ def validate_template(self, template: Dict[str, Any]) -> List[str]: """ errors = [] + # v0.3.0: reject pre-v0.3.0 shapes and validate metadata.maturity universally. + errors.extend(reject_old_shape(template)) + errors.extend(validate_maturity(template)) + + # metadata.ads is detection-only; flag on this provider. + metadata_block = template.get("metadata") + if isinstance(metadata_block, dict) and "ads" in metadata_block: + errors.append("metadata.ads is only supported on detection resources (this is a saved_search template)") + # Required fields for LogScale saved query schema required_fields = ["$schema", "name", "queryString"] for field in required_fields: @@ -284,11 +295,8 @@ def create_resource(self, resource_id: Optional[str], template: Dict[str, Any]) logger.debug(f"[DEBUG] Original template keys: {list(template.keys())}") logger.debug(f"[DEBUG] Template name: '{template.get('name')}'") - # Create clean template without API parameters and IaC-only metadata fields - # Exclude: fields starting with '_' (e.g., _search_domain), 'type' (added by template discovery), - # and IaC-only fields that are not part of the LogScale saved query schema - IAC_ONLY_FIELDS = {"type", "resource_id", "dependencies"} - clean_template = {k: v for k, v in template.items() if not k.startswith("_") and k not in IAC_ONLY_FIELDS} + # v0.3.0: strip universal IaC-only + internal + metadata fields. + clean_template = strip_for_api(template) # DEBUG: Log cleaned template logger.debug(f"[DEBUG] Cleaned template keys: {list(clean_template.keys())}") @@ -386,11 +394,8 @@ def update_resource( logger.debug(f"[DEBUG] UPDATE - Original template keys: {list(template.keys())}") logger.debug(f"[DEBUG] UPDATE - Template name: '{template.get('name')}'") - # Create clean template without API parameters and IaC-only metadata fields - # Exclude: fields starting with '_' (e.g., _search_domain), 'type' (added by template discovery), - # and IaC-only fields that are not part of the LogScale saved query schema - IAC_ONLY_FIELDS = {"type", "resource_id", "dependencies"} - clean_template = {k: v for k, v in template.items() if not k.startswith("_") and k not in IAC_ONLY_FIELDS} + # v0.3.0: strip universal IaC-only + internal + metadata fields. + clean_template = strip_for_api(template) # DEBUG: Log cleaned template logger.debug(f"[DEBUG] UPDATE - Cleaned template keys: {list(clean_template.keys())}") @@ -561,6 +566,9 @@ def compute_content_hash(self, template: Dict[str, Any]) -> str: Returns: SHA256 hash as hex string """ + # v0.3.0: strip universal IaC-only + internal + metadata fields first. + template = strip_for_hash(template) + # Normalize content for consistent hashing # Only hash the LogScale schema fields (exclude API parameters like _search_domain) normalized_content = { diff --git a/src/talonctl/providers/workflow_provider.py b/src/talonctl/providers/workflow_provider.py index 1ac06f2..1c89862 100644 --- a/src/talonctl/providers/workflow_provider.py +++ b/src/talonctl/providers/workflow_provider.py @@ -14,6 +14,8 @@ from datetime import datetime, timezone from talonctl.core.base_provider import BaseResourceProvider, ResourceAction, ResourceChange +from talonctl.core.metadata_validators import reject_old_shape, validate_maturity +from talonctl.core.template_sanitizer import strip_for_hash try: from falconpy import Workflows @@ -79,6 +81,15 @@ def validate_template(self, template: Dict[str, Any]) -> List[str]: """ errors = [] + # v0.3.0: reject pre-v0.3.0 shapes and validate metadata.maturity universally. + errors.extend(reject_old_shape(template)) + errors.extend(validate_maturity(template)) + + # metadata.ads is detection-only; flag on this provider. + metadata_block = template.get("metadata") + if isinstance(metadata_block, dict) and "ads" in metadata_block: + errors.append("metadata.ads is only supported on detection resources (this is a workflow template)") + # Required fields required_fields = ["resource_id", "name", "trigger", "actions"] for field in required_fields: @@ -405,6 +416,8 @@ def compute_content_hash(self, template: Dict[str, Any]) -> str: Only includes fields that affect workflow behavior """ + # v0.3.0: strip universal IaC-only + internal + metadata fields first. + template = strip_for_hash(template) # Normalize content for consistent hashing normalized_content = { "name": template.get("name", ""), diff --git a/tests/test_dashboard_provider.py b/tests/test_dashboard_provider.py index 6988338..ffcf059 100644 --- a/tests/test_dashboard_provider.py +++ b/tests/test_dashboard_provider.py @@ -607,3 +607,56 @@ def test_normalize_for_hash_ignores_template_path(self, provider, valid_template hash_with = provider.compute_content_hash(tmpl_with_path) assert hash_without == hash_with + + +# --- v0.3.0 metadata namespace redesign --- + + +@pytest.fixture +def minimal_dashboard(): + return { + "resource_id": "x", + "name": "Test Dashboard", + "sections": {"s0": {"order": 0, "widgetIds": ["w0"]}}, + "widgets": {"w0": {"type": "note", "text": "hi"}}, + } + + +class TestV03MetadataNamespace: + def test_metadata_maturity_validates(self, provider, minimal_dashboard): + minimal_dashboard["metadata"] = {"maturity": {"created": "2026-04-16", "confidence": "medium"}} + assert provider.validate_template(minimal_dashboard) == [] + + def test_metadata_ads_rejected(self, provider, minimal_dashboard): + minimal_dashboard["metadata"] = {"ads": {"goal": "g"}} + errors = provider.validate_template(minimal_dashboard) + assert any("metadata.ads is only supported on detection resources" in e and "dashboard" in e for e in errors) + + def test_old_top_level_ads_rejected(self, provider, minimal_dashboard): + minimal_dashboard["ads"] = {"goal": "g"} + errors = provider.validate_template(minimal_dashboard) + assert any("Top-level 'ads:' is removed in v0.3.0" in e for e in errors) + + def test_metadata_edits_do_not_change_content_hash(self, provider, minimal_dashboard): + base_hash = provider.compute_content_hash(minimal_dashboard) + with_metadata = copy.deepcopy(minimal_dashboard) + with_metadata["metadata"] = { + "maturity": {"created": "2026-04-16", "tune_count": 1}, + "acme_corp": {"any": "thing"}, + } + assert provider.compute_content_hash(with_metadata) == base_hash + + def test_payload_strips_metadata_and_template_path(self, provider, minimal_dashboard): + # Direct regression test for issue #7 — _template_path must not leak into + # the Humio YAML upload, and neither should the new metadata: block. + tmpl = copy.deepcopy(minimal_dashboard) + tmpl["metadata"] = {"maturity": {"created": "2026-04-16"}, "acme_corp": {"a": 1}} + tmpl["_template_path"] = "/tmp/x.yaml" + yaml_str = provider._prepare_yaml_payload(tmpl) + assert "_template_path" not in yaml_str + assert "metadata:" not in yaml_str + assert "acme_corp" not in yaml_str + assert "resource_id" not in yaml_str + # Provider-owned fields preserved (or transformed). + assert "sections:" in yaml_str + assert "widgets:" in yaml_str diff --git a/tests/unit/test_detection_provider.py b/tests/unit/test_detection_provider.py index cbe4694..e622737 100644 --- a/tests/unit/test_detection_provider.py +++ b/tests/unit/test_detection_provider.py @@ -366,149 +366,165 @@ def test_validate_template_ads_absent_passes(self, provider): assert errors == [] def test_validate_template_ads_valid(self, provider): - """Valid ads: block with required goal field""" + """Valid metadata.ads: block with required goal field (v0.3.0 shape).""" template = { "name": "Test Rule", "description": "Test", "severity": 50, "search": {"query": "test"}, - "ads": { - "goal": "Detect unauthorized access to EC2 security groups", - "mitre_attack": ["Defense Evasion / Impair Defenses"], - "blind_spots": ["Service-linked role changes not logged"], - "strategy_abstract": "Correlates EC2 SG changes with known CI/CD patterns", + "metadata": { + "ads": { + "goal": "Detect unauthorized access to EC2 security groups", + "mitre_attack": ["Defense Evasion / Impair Defenses"], + "blind_spots": ["Service-linked role changes not logged"], + "strategy_abstract": "Correlates EC2 SG changes with known CI/CD patterns", + } }, } errors = provider.validate_template(template) assert errors == [] def test_validate_template_ads_missing_goal(self, provider): - """ads: block present without goal should fail""" + """metadata.ads block present without goal should fail.""" template = { "name": "Test Rule", "description": "Test", "severity": 50, "search": {"query": "test"}, - "ads": { - "strategy_abstract": "Some strategy", + "metadata": { + "ads": { + "strategy_abstract": "Some strategy", + } }, } errors = provider.validate_template(template) assert any("ads.goal" in err for err in errors) def test_validate_template_ads_empty_goal(self, provider): - """ads: block with empty goal should fail""" + """metadata.ads block with empty goal should fail.""" template = { "name": "Test Rule", "description": "Test", "severity": 50, "search": {"query": "test"}, - "ads": { - "goal": "", + "metadata": { + "ads": { + "goal": "", + } }, } errors = provider.validate_template(template) assert any("ads.goal" in err for err in errors) def test_validate_template_ads_unknown_field(self, provider): - """Unknown fields in ads: block should be rejected""" + """Unknown fields in metadata.ads block should be rejected.""" template = { "name": "Test Rule", "description": "Test", "severity": 50, "search": {"query": "test"}, - "ads": { - "goal": "Detect something", - "unknown_field": "value", + "metadata": { + "ads": { + "goal": "Detect something", + "unknown_field": "value", + } }, } errors = provider.validate_template(template) assert any("unknown_field" in err for err in errors) def test_validate_template_ads_list_field_not_list(self, provider): - """List fields in ads: must be lists""" + """List fields in metadata.ads must be lists.""" template = { "name": "Test Rule", "description": "Test", "severity": 50, "search": {"query": "test"}, - "ads": { - "goal": "Detect something", - "blind_spots": "not a list", + "metadata": { + "ads": { + "goal": "Detect something", + "blind_spots": "not a list", + } }, } errors = provider.validate_template(template) assert any("ads.blind_spots" in err and "list" in err for err in errors) def test_validate_template_ads_string_field_not_string(self, provider): - """String fields in ads: must be strings""" + """String fields in metadata.ads must be strings.""" template = { "name": "Test Rule", "description": "Test", "severity": 50, "search": {"query": "test"}, - "ads": { - "goal": ["not", "a", "string"], + "metadata": { + "ads": { + "goal": ["not", "a", "string"], + } }, } errors = provider.validate_template(template) assert any("ads.goal" in err and "string" in err for err in errors) def test_validate_template_ads_not_dict(self, provider): - """ads: must be a dictionary if present""" + """metadata.ads must be a dictionary if present.""" template = { "name": "Test Rule", "description": "Test", "severity": 50, "search": {"query": "test"}, - "ads": "not a dict", + "metadata": {"ads": "not a dict"}, } errors = provider.validate_template(template) - assert any("'ads' must be a dictionary" in err for err in errors) + assert any("'metadata.ads' must be a dictionary" in err for err in errors) def test_validate_template_ads_false_positives_mixed_entries(self, provider): - """false_positives can contain both dicts and string references""" + """false_positives can contain both dicts and string references.""" template = { "name": "Test Rule", "description": "Test", "severity": 50, "search": {"query": "test"}, - "ads": { - "goal": "Detect something", - "false_positives": [ - { - "pattern": "CI/CD Terraform deployments", - "characteristics": "github-actions-role ARN", - "tuning": "Filtered via $aws_service_account_detector()", - "status": "tuned", - }, - "-> knowledge/patterns/aws.md#autoscaling-service-role", - ], + "metadata": { + "ads": { + "goal": "Detect something", + "false_positives": [ + { + "pattern": "CI/CD Terraform deployments", + "characteristics": "github-actions-role ARN", + "tuning": "Filtered via $aws_service_account_detector()", + "status": "tuned", + }, + "-> knowledge/patterns/aws.md#autoscaling-service-role", + ], + } }, } errors = provider.validate_template(template) assert errors == [] def test_validate_template_ads_all_optional_fields(self, provider): - """All optional ADS fields should be accepted""" + """All optional ADS fields should be accepted.""" template = { "name": "Test Rule", "description": "Test", "severity": 50, "search": {"query": "test"}, - "ads": { - "goal": "Detect unauthorized access", - "mitre_attack": ["TA0005:T1562"], - "strategy_abstract": "Correlates SG changes", - "technical_context": "CloudTrail, EC2 SG API calls", - "blind_spots": ["Service-linked roles"], - "false_positives": ["CI/CD automation"], - "validation": ["Modify SG from unapproved role"], - "priority_rationale": "High-value asset modification", - "response": "See playbook cloud-security-aws.md", - "ads_created": "2026-04-14", - "ads_updated": "2026-04-14", - "ads_author": "Will Webster", + "metadata": { + "ads": { + "goal": "Detect unauthorized access", + "mitre_attack": ["TA0005:T1562"], + "strategy_abstract": "Correlates SG changes", + "technical_context": "CloudTrail, EC2 SG API calls", + "blind_spots": ["Service-linked roles"], + "false_positives": ["CI/CD automation"], + "validation": ["Modify SG from unapproved role"], + "priority_rationale": "High-value asset modification", + "response": "See playbook cloud-security-aws.md", + "ads_created": "2026-04-14", + "ads_updated": "2026-04-14", + "ads_author": "Will Webster", + } }, } errors = provider.validate_template(template) @@ -518,8 +534,9 @@ def test_validate_template_ads_all_optional_fields(self, provider): @pytest.fixture def minimal_detection(self): - """Minimal valid detection template — used for metadata/ADS test permutations.""" + """Minimal valid detection template (v0.3.0 new-shape) — used for metadata/ADS test permutations.""" return { + "resource_id": "x", "name": "Test Rule", "description": "Test", "severity": 50, @@ -531,19 +548,21 @@ def test_validate_metadata_absent_passes(self, provider, minimal_detection): errors = provider.validate_template(minimal_detection) assert errors == [] - def test_validate_metadata_empty_dict_passes(self, provider, minimal_detection): - """Empty metadata: {} is valid (all fields optional when block present).""" - minimal_detection["metadata"] = {} + def test_validate_metadata_empty_maturity_passes(self, provider, minimal_detection): + """Empty metadata.maturity: {} is valid (all fields optional when block present).""" + minimal_detection["metadata"] = {"maturity": {}} errors = provider.validate_template(minimal_detection) assert errors == [] def test_validate_metadata_full_valid_block(self, provider, minimal_detection): - """All four fields populated with valid values passes.""" + """All four maturity fields populated with valid values passes.""" minimal_detection["metadata"] = { - "created": "2026-01-15", - "last_tuned": "2026-04-10", - "tune_count": 3, - "confidence": "high", + "maturity": { + "created": "2026-01-15", + "last_tuned": "2026-04-10", + "tune_count": 3, + "confidence": "high", + } } errors = provider.validate_template(minimal_detection) assert errors == [] @@ -554,9 +573,9 @@ def test_validate_metadata_not_dict(self, provider, minimal_detection): errors = provider.validate_template(minimal_detection) assert any("'metadata' must be a dictionary" in err for err in errors) - def test_validate_metadata_unknown_key(self, provider, minimal_detection): - """Unknown keys in metadata: are rejected with typo-friendly error.""" - minimal_detection["metadata"] = {"created": "2026-01-15", "confidance": "high"} + def test_validate_metadata_unknown_maturity_key(self, provider, minimal_detection): + """Unknown keys in metadata.maturity: are rejected with typo-friendly error.""" + minimal_detection["metadata"] = {"maturity": {"created": "2026-01-15", "confidance": "high"}} errors = provider.validate_template(minimal_detection) assert any("confidance" in err for err in errors) # Error should list known keys for user guidance @@ -564,257 +583,320 @@ def test_validate_metadata_unknown_key(self, provider, minimal_detection): def test_validate_metadata_last_tuned_null(self, provider, minimal_detection): """last_tuned: null is valid (means never tuned).""" - minimal_detection["metadata"] = {"last_tuned": None, "tune_count": 0} + minimal_detection["metadata"] = {"maturity": {"last_tuned": None, "tune_count": 0}} errors = provider.validate_template(minimal_detection) assert errors == [] def test_validate_metadata_bad_date_format(self, provider, minimal_detection): """created/last_tuned must match YYYY-MM-DD.""" - minimal_detection["metadata"] = {"created": "2026-4-14"} # missing zero pad + minimal_detection["metadata"] = {"maturity": {"created": "2026-4-14"}} # missing zero pad errors = provider.validate_template(minimal_detection) - assert any("metadata.created" in err and "YYYY-MM-DD" in err for err in errors) + assert any("metadata.maturity.created" in err and "YYYY-MM-DD" in err for err in errors) def test_validate_metadata_non_date_string(self, provider, minimal_detection): """Non-date strings rejected for date fields.""" - minimal_detection["metadata"] = {"last_tuned": "yesterday"} + minimal_detection["metadata"] = {"maturity": {"last_tuned": "yesterday"}} errors = provider.validate_template(minimal_detection) - assert any("metadata.last_tuned" in err for err in errors) + assert any("metadata.maturity.last_tuned" in err for err in errors) def test_validate_metadata_tune_count_negative(self, provider, minimal_detection): """tune_count must be >= 0.""" - minimal_detection["metadata"] = {"tune_count": -1} + minimal_detection["metadata"] = {"maturity": {"tune_count": -1}} errors = provider.validate_template(minimal_detection) - assert any("metadata.tune_count" in err and "non-negative" in err for err in errors) + assert any("metadata.maturity.tune_count" in err and "non-negative" in err for err in errors) def test_validate_metadata_tune_count_string(self, provider, minimal_detection): """tune_count must be an int, not a string.""" - minimal_detection["metadata"] = {"tune_count": "3"} + minimal_detection["metadata"] = {"maturity": {"tune_count": "3"}} errors = provider.validate_template(minimal_detection) - assert any("metadata.tune_count" in err for err in errors) + assert any("metadata.maturity.tune_count" in err for err in errors) def test_validate_metadata_tune_count_bool_rejected(self, provider, minimal_detection): """Python bool is technically int — must be explicitly rejected.""" - minimal_detection["metadata"] = {"tune_count": True} + minimal_detection["metadata"] = {"maturity": {"tune_count": True}} errors = provider.validate_template(minimal_detection) - assert any("metadata.tune_count" in err for err in errors) + assert any("metadata.maturity.tune_count" in err for err in errors) def test_validate_metadata_tune_count_float(self, provider, minimal_detection): """tune_count must be int, not float.""" - minimal_detection["metadata"] = {"tune_count": 1.5} + minimal_detection["metadata"] = {"maturity": {"tune_count": 1.5}} errors = provider.validate_template(minimal_detection) - assert any("metadata.tune_count" in err for err in errors) + assert any("metadata.maturity.tune_count" in err for err in errors) def test_validate_metadata_tune_count_zero(self, provider, minimal_detection): """tune_count: 0 is valid (boundary).""" - minimal_detection["metadata"] = {"tune_count": 0} + minimal_detection["metadata"] = {"maturity": {"tune_count": 0}} errors = provider.validate_template(minimal_detection) assert errors == [] def test_validate_metadata_confidence_valid_values(self, provider, minimal_detection): """Each of the four confidence values must pass.""" for value in ("low", "medium", "high", "validated"): - minimal_detection["metadata"] = {"confidence": value} + minimal_detection["metadata"] = {"maturity": {"confidence": value}} errors = provider.validate_template(minimal_detection) assert errors == [], f"confidence={value} should pass, got {errors}" def test_validate_metadata_confidence_invalid(self, provider, minimal_detection): """Confidence value not in enum rejected, error names all allowed values.""" - minimal_detection["metadata"] = {"confidence": "mature"} + minimal_detection["metadata"] = {"maturity": {"confidence": "mature"}} errors = provider.validate_template(minimal_detection) - assert any("metadata.confidence" in err and "low" in err and "validated" in err for err in errors) + assert any("metadata.maturity.confidence" in err and "low" in err and "validated" in err for err in errors) def test_validate_metadata_errors_accumulate(self, provider, minimal_detection): """Multiple metadata errors produce multiple distinct errors (no short-circuit).""" minimal_detection["metadata"] = { - "created": "bad-date", - "tune_count": -1, - "confidence": "not-in-enum", + "maturity": { + "created": "bad-date", + "tune_count": -1, + "confidence": "not-in-enum", + } } errors = provider.validate_template(minimal_detection) metadata_errors = [e for e in errors if "metadata." in e] assert len(metadata_errors) >= 3, f"expected 3+ metadata errors, got {metadata_errors}" - # --- ads: path-ref extension (false_positives / response / validation) --- + # --- ads: path-ref extension (false_positives / response / validation). + # v0.3.0: ads: relocated under metadata.ads (was top-level ads:). --- + + @staticmethod + def _set_ads(template, ads): + """Helper: set ADS block at the v0.3.0 path (metadata.ads).""" + template.setdefault("metadata", {})["ads"] = ads + return template def test_validate_ads_fp_ref_dict_valid(self, provider, minimal_detection): """false_positives entry can be {path, label}.""" - minimal_detection["ads"] = { - "goal": "Detect X", - "false_positives": [ - {"path": "knowledge/patterns/aws.md#ci-cd", "label": "CI/CD Terraform"}, - ], - } + self._set_ads( + minimal_detection, + { + "goal": "Detect X", + "false_positives": [ + {"path": "knowledge/patterns/aws.md#ci-cd", "label": "CI/CD Terraform"}, + ], + }, + ) errors = provider.validate_template(minimal_detection) assert errors == [] def test_validate_ads_fp_ref_dict_label_optional(self, provider, minimal_detection): """label is optional in ref dict.""" - minimal_detection["ads"] = { - "goal": "Detect X", - "false_positives": [{"path": "knowledge/patterns/aws.md#ci-cd"}], - } + self._set_ads( + minimal_detection, + { + "goal": "Detect X", + "false_positives": [{"path": "knowledge/patterns/aws.md#ci-cd"}], + }, + ) errors = provider.validate_template(minimal_detection) assert errors == [] def test_validate_ads_fp_mixed_forms(self, provider, minimal_detection): """false_positives can mix string refs, inline FP dicts, and ref dicts.""" - minimal_detection["ads"] = { - "goal": "Detect X", - "false_positives": [ - "-> knowledge/patterns/aws.md#legacy", - {"pattern": "P", "characteristics": "C", "tuning": "T", "status": "tuned"}, - {"path": "knowledge/patterns/aws.md#new", "label": "New pattern"}, - ], - } + self._set_ads( + minimal_detection, + { + "goal": "Detect X", + "false_positives": [ + "-> knowledge/patterns/aws.md#legacy", + {"pattern": "P", "characteristics": "C", "tuning": "T", "status": "tuned"}, + {"path": "knowledge/patterns/aws.md#new", "label": "New pattern"}, + ], + }, + ) errors = provider.validate_template(minimal_detection) assert errors == [] def test_validate_ads_validation_ref_dict(self, provider, minimal_detection): """validation entry can be a ref dict.""" - minimal_detection["ads"] = { - "goal": "Detect X", - "validation": [{"path": "knowledge/validations/foo.md"}], - } + self._set_ads( + minimal_detection, + { + "goal": "Detect X", + "validation": [{"path": "knowledge/validations/foo.md"}], + }, + ) errors = provider.validate_template(minimal_detection) assert errors == [] def test_validate_ads_validation_mixed_forms(self, provider, minimal_detection): """validation can mix strings and ref dicts.""" - minimal_detection["ads"] = { - "goal": "Detect X", - "validation": ["Step 1: do thing", {"path": "knowledge/validations/foo.md"}], - } + self._set_ads( + minimal_detection, + { + "goal": "Detect X", + "validation": ["Step 1: do thing", {"path": "knowledge/validations/foo.md"}], + }, + ) errors = provider.validate_template(minimal_detection) assert errors == [] def test_validate_ads_response_ref_dict(self, provider, minimal_detection): """response can be a ref dict instead of a string.""" - minimal_detection["ads"] = { - "goal": "Detect X", - "response": {"path": "playbooks/aws.md#sg-anomaly", "label": "SG anomaly playbook"}, - } + self._set_ads( + minimal_detection, + { + "goal": "Detect X", + "response": {"path": "playbooks/aws.md#sg-anomaly", "label": "SG anomaly playbook"}, + }, + ) errors = provider.validate_template(minimal_detection) assert errors == [] def test_validate_ads_response_string_still_valid(self, provider, minimal_detection): """response: '...' string form still valid (backward compat).""" - minimal_detection["ads"] = {"goal": "Detect X", "response": "Investigate user"} + self._set_ads(minimal_detection, {"goal": "Detect X", "response": "Investigate user"}) errors = provider.validate_template(minimal_detection) assert errors == [] def test_validate_ads_ref_dict_missing_path(self, provider, minimal_detection): """Ref dict without 'path' key rejected.""" - minimal_detection["ads"] = { - "goal": "Detect X", - "false_positives": [{"label": "orphan"}], - } + self._set_ads( + minimal_detection, + { + "goal": "Detect X", + "false_positives": [{"label": "orphan"}], + }, + ) errors = provider.validate_template(minimal_detection) assert any("ads.false_positives" in err and "path" in err for err in errors) def test_validate_ads_ref_dict_unknown_key(self, provider, minimal_detection): """Ref dict with unknown keys rejected.""" - minimal_detection["ads"] = { - "goal": "Detect X", - "false_positives": [{"path": "x", "labol": "typo"}], - } + self._set_ads( + minimal_detection, + { + "goal": "Detect X", + "false_positives": [{"path": "x", "labol": "typo"}], + }, + ) errors = provider.validate_template(minimal_detection) assert any("labol" in err for err in errors) def test_validate_ads_ref_dict_empty_path(self, provider, minimal_detection): """Ref dict with empty path rejected.""" - minimal_detection["ads"] = { - "goal": "Detect X", - "false_positives": [{"path": ""}], - } + self._set_ads( + minimal_detection, + { + "goal": "Detect X", + "false_positives": [{"path": ""}], + }, + ) errors = provider.validate_template(minimal_detection) assert any("path" in err and "non-empty" in err for err in errors) def test_validate_ads_ref_dict_whitespace_path(self, provider, minimal_detection): """Ref dict with whitespace-only path rejected.""" - minimal_detection["ads"] = { - "goal": "Detect X", - "false_positives": [{"path": " "}], - } + self._set_ads( + minimal_detection, + { + "goal": "Detect X", + "false_positives": [{"path": " "}], + }, + ) errors = provider.validate_template(minimal_detection) assert any("path" in err for err in errors) def test_validate_ads_ref_dict_path_with_space(self, provider, minimal_detection): """Ref dict path containing whitespace rejected.""" - minimal_detection["ads"] = { - "goal": "Detect X", - "false_positives": [{"path": "has space"}], - } + self._set_ads( + minimal_detection, + { + "goal": "Detect X", + "false_positives": [{"path": "has space"}], + }, + ) errors = provider.validate_template(minimal_detection) assert any("path" in err and "whitespace" in err for err in errors) def test_validate_ads_ref_dict_path_non_string(self, provider, minimal_detection): """Ref dict path must be a string.""" - minimal_detection["ads"] = { - "goal": "Detect X", - "false_positives": [{"path": 123}], - } + self._set_ads( + minimal_detection, + { + "goal": "Detect X", + "false_positives": [{"path": 123}], + }, + ) errors = provider.validate_template(minimal_detection) assert any("path" in err and "string" in err for err in errors) def test_validate_ads_ref_dict_label_non_string(self, provider, minimal_detection): """Ref dict label must be a string when present.""" - minimal_detection["ads"] = { - "goal": "Detect X", - "false_positives": [{"path": "x", "label": 123}], - } + self._set_ads( + minimal_detection, + { + "goal": "Detect X", + "false_positives": [{"path": "x", "label": 123}], + }, + ) errors = provider.validate_template(minimal_detection) assert any("label" in err and "string" in err for err in errors) def test_validate_ads_ref_dict_label_empty(self, provider, minimal_detection): """Ref dict label must be non-empty when present.""" - minimal_detection["ads"] = { - "goal": "Detect X", - "false_positives": [{"path": "x", "label": ""}], - } + self._set_ads( + minimal_detection, + { + "goal": "Detect X", + "false_positives": [{"path": "x", "label": ""}], + }, + ) errors = provider.validate_template(minimal_detection) assert any("label" in err and "non-empty" in err for err in errors) def test_validate_ads_validation_inline_dict_rejected(self, provider, minimal_detection): """validation has no inline-dict form — a non-ref dict is rejected.""" - minimal_detection["ads"] = { - "goal": "Detect X", - "validation": [{"characteristics": "oops"}], - } + self._set_ads( + minimal_detection, + { + "goal": "Detect X", + "validation": [{"characteristics": "oops"}], + }, + ) errors = provider.validate_template(minimal_detection) assert any("ads.validation" in err and ("strings" in err or "ref" in err) for err in errors) def test_validate_ads_response_dict_unknown_key_rejected(self, provider, minimal_detection): """response dict treated as ref dict — unknown keys rejected.""" - minimal_detection["ads"] = { - "goal": "Detect X", - "response": {"path": "x", "note": "extra"}, - } + self._set_ads( + minimal_detection, + { + "goal": "Detect X", + "response": {"path": "x", "note": "extra"}, + }, + ) errors = provider.validate_template(minimal_detection) assert any("note" in err for err in errors) def test_validate_ads_fp_inline_dict_strict_keys(self, provider, minimal_detection): """Inline FP dict keys must be in {pattern, characteristics, tuning, status}.""" - minimal_detection["ads"] = { - "goal": "Detect X", - "false_positives": [ - {"pattern": "P", "charactaristics": "typo", "tuning": "T", "status": "tuned"}, - ], - } + self._set_ads( + minimal_detection, + { + "goal": "Detect X", + "false_positives": [ + {"pattern": "P", "charactaristics": "typo", "tuning": "T", "status": "tuned"}, + ], + }, + ) errors = provider.validate_template(minimal_detection) assert any("charactaristics" in err for err in errors) def test_validate_ads_fp_entry_wrong_type(self, provider, minimal_detection): """false_positives entries must be string or dict.""" - minimal_detection["ads"] = { - "goal": "Detect X", - "false_positives": [123], - } + self._set_ads( + minimal_detection, + { + "goal": "Detect X", + "false_positives": [123], + }, + ) errors = provider.validate_template(minimal_detection) assert any("ads.false_positives" in err for err in errors) def test_validate_ads_response_wrong_type(self, provider, minimal_detection): """response must be string or ref dict, not list.""" - minimal_detection["ads"] = {"goal": "Detect X", "response": ["bad"]} + self._set_ads(minimal_detection, {"goal": "Detect X", "response": ["bad"]}) errors = provider.validate_template(minimal_detection) assert any("ads.response" in err for err in errors) @@ -822,11 +904,11 @@ def test_validate_ads_response_wrong_type(self, provider, minimal_detection): @pytest.fixture def rich_template(self): - """Template populated with both metadata: and ads: blocks, for leak tests.""" + """Template populated with metadata.maturity and metadata.ads (v0.3.0 shape).""" return { "resource_id": "test___test___rich", "name": "Rich Template", - "description": "Template with both metadata: and ads: populated", + "description": "Template with metadata.maturity + metadata.ads populated", "severity": 50, "status": "active", "search": { @@ -837,66 +919,62 @@ def rich_template(self): }, "mitre_attack": ["TA0005:T1562"], "metadata": { - "created": "2026-01-15", - "last_tuned": "2026-04-10", - "tune_count": 3, - "confidence": "high", - }, - "ads": { - "goal": "Detect the thing", - "blind_spots": ["blind to X"], - "priority_rationale": "Medium — commodity technique", - "false_positives": [ - {"path": "knowledge/patterns/aws.md#ci-cd", "label": "CI/CD"}, - ], - "response": {"path": "playbooks/aws.md#sg-anomaly"}, + "maturity": { + "created": "2026-01-15", + "last_tuned": "2026-04-10", + "tune_count": 3, + "confidence": "high", + }, + "ads": { + "goal": "Detect the thing", + "blind_spots": ["blind to X"], + "priority_rationale": "Medium — commodity technique", + "false_positives": [ + {"path": "knowledge/patterns/aws.md#ci-cd", "label": "CI/CD"}, + ], + "response": {"path": "playbooks/aws.md#sg-anomaly"}, + }, }, } - def test_create_payload_excludes_metadata_and_ads(self, provider, rich_template): - """POST (create) payload must not contain metadata: or ads: keys.""" + def test_create_payload_excludes_metadata(self, provider, rich_template): + """POST (create) payload must not contain the metadata: namespace.""" payload = provider._prepare_rule_payload(rich_template) assert "metadata" not in payload, f"metadata: leaked into create payload: {payload}" assert "ads" not in payload, f"ads: leaked into create payload: {payload}" - def test_patch_payload_excludes_metadata_and_ads(self, provider, rich_template): - """PATCH (update) payload must not contain metadata: or ads: keys.""" + def test_patch_payload_excludes_metadata(self, provider, rich_template): + """PATCH (update) payload must not contain the metadata: namespace.""" payload = provider._prepare_patch_payload(rich_template) assert "metadata" not in payload, f"metadata: leaked into patch payload: {payload}" assert "ads" not in payload, f"ads: leaked into patch payload: {payload}" def test_hash_unchanged_when_metadata_mutates(self, provider, rich_template): - """Editing any metadata: field must not change the content hash.""" + """Editing any metadata: field (maturity or ads) must not change the content hash.""" baseline = provider.compute_content_hash(rich_template) mutated = { **rich_template, "metadata": { - "created": "2026-01-15", - "last_tuned": "2026-04-16", # changed - "tune_count": 4, # changed - "confidence": "validated", # changed + "maturity": { + "created": "2026-01-15", + "last_tuned": "2026-04-16", # changed + "tune_count": 4, # changed + "confidence": "validated", # changed + }, + "ads": { + "goal": "Detect the thing (revised)", + "strategy_abstract": "New abstract", + "blind_spots": ["blind to X", "also blind to Y"], + "false_positives": [ + {"path": "knowledge/patterns/aws.md#ci-cd", "label": "CI/CD"}, + {"path": "knowledge/patterns/aws.md#new", "label": "New pattern"}, + ], + "response": "Inline response now", + }, }, } assert provider.compute_content_hash(mutated) == baseline, "metadata: mutation must not change content hash" - def test_hash_unchanged_when_ads_mutates(self, provider, rich_template): - """Editing any ads: field (including ref dicts) must not change the content hash.""" - baseline = provider.compute_content_hash(rich_template) - mutated = { - **rich_template, - "ads": { - "goal": "Detect the thing (revised)", - "strategy_abstract": "New abstract", - "blind_spots": ["blind to X", "also blind to Y"], - "false_positives": [ - {"path": "knowledge/patterns/aws.md#ci-cd", "label": "CI/CD"}, - {"path": "knowledge/patterns/aws.md#new", "label": "New pattern"}, - ], - "response": "Inline response now", - }, - } - assert provider.compute_content_hash(mutated) == baseline, "ads: mutation must not change content hash" - def test_hash_changes_when_real_field_mutates(self, provider, rich_template): """Sanity: content hash MUST change when a real CONTENT_FIELDS member mutates.""" baseline = provider.compute_content_hash(rich_template) @@ -905,6 +983,87 @@ def test_hash_changes_when_real_field_mutates(self, provider, rich_template): "severity change must produce a different hash (sanity check)" ) + # --- v0.3.0 metadata namespace redesign --- + + def test_v03_new_shape_metadata_ads_validates(self, provider, minimal_detection): + minimal_detection["metadata"] = { + "ads": {"goal": "Detect something", "mitre_attack": ["TA0011:T1090.003"]}, + } + assert provider.validate_template(minimal_detection) == [] + + def test_v03_new_shape_metadata_maturity_validates(self, provider, minimal_detection): + minimal_detection["metadata"] = { + "maturity": {"created": "2026-04-16", "tune_count": 2, "confidence": "high"}, + } + assert provider.validate_template(minimal_detection) == [] + + def test_v03_new_shape_both_blocks_together(self, provider, minimal_detection): + minimal_detection["metadata"] = { + "maturity": {"created": "2026-04-16"}, + "ads": {"goal": "Detect something"}, + } + assert provider.validate_template(minimal_detection) == [] + + def test_v03_old_top_level_ads_rejected(self, provider, minimal_detection): + minimal_detection["ads"] = {"goal": "Detect something"} + errors = provider.validate_template(minimal_detection) + # Exact-string guard against silent refactors — CHANGELOG pointer must remain. + assert any( + "Top-level 'ads:' is removed in v0.3.0" in e and "metadata.ads" in e and "CHANGELOG.md" in e for e in errors + ) + + def test_v03_old_flat_metadata_rejected(self, provider, minimal_detection): + minimal_detection["metadata"] = {"created": "2026-04-16", "tune_count": 2} + errors = provider.validate_template(minimal_detection) + assert any( + "Top-level 'metadata:' now reserves sub-namespaces" in e and "metadata.maturity" in e for e in errors + ) + + def test_v03_metadata_ads_goal_required(self, provider, minimal_detection): + minimal_detection["metadata"] = {"ads": {"mitre_attack": ["x"]}} # missing goal + errors = provider.validate_template(minimal_detection) + assert any("ads.goal is required" in e for e in errors) + + def test_v03_metadata_ads_unknown_field(self, provider, minimal_detection): + minimal_detection["metadata"] = {"ads": {"goal": "g", "bogus": 1}} + errors = provider.validate_template(minimal_detection) + assert any("Unknown ads fields: bogus" in e for e in errors) + + def test_v03_metadata_ads_ref_dict_false_positive(self, provider, minimal_detection): + # Existing ref-dict form (from v0.2.x spec) continues to work under new path. + minimal_detection["metadata"] = { + "ads": { + "goal": "g", + "false_positives": [ + {"path": "knowledge/patterns/net.md#anchor", "label": "ok"}, + ], + } + } + assert provider.validate_template(minimal_detection) == [] + + def test_v03_metadata_not_dict(self, provider, minimal_detection): + minimal_detection["metadata"] = "oops" + errors = provider.validate_template(minimal_detection) + assert any("'metadata' must be a dictionary" in e for e in errors) + + def test_v03_metadata_edits_do_not_change_content_hash(self, provider, minimal_detection): + # The whole point of the metadata: namespace — plan must show NO_CHANGE. + base_hash = provider.compute_content_hash(minimal_detection) + with_metadata = dict(minimal_detection) + with_metadata["metadata"] = { + "maturity": {"created": "2026-04-16", "tune_count": 9}, + "ads": {"goal": "g"}, + "acme_corp": {"anything": True}, + } + assert provider.compute_content_hash(with_metadata) == base_hash + + def test_v03_internal_prefix_fields_do_not_change_content_hash(self, provider, minimal_detection): + base_hash = provider.compute_content_hash(minimal_detection) + with_internal = dict(minimal_detection) + with_internal["_template_path"] = "/tmp/x.yaml" + with_internal["_probe_future_internal"] = "z" + assert provider.compute_content_hash(with_internal) == base_hash + if __name__ == "__main__": pytest.main([__file__, "-v"]) diff --git a/tests/unit/test_examples.py b/tests/unit/test_examples.py new file mode 100644 index 0000000..b10c217 --- /dev/null +++ b/tests/unit/test_examples.py @@ -0,0 +1,84 @@ +"""Parity test: every reference template under examples/resources/ must validate +cleanly against its matching provider. Guards against silent drift of the +reference YAMLs out of schema. +""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import patch + +import pytest +import yaml + +from talonctl.providers.dashboard_provider import DashboardProvider +from talonctl.providers.detection_provider import DetectionProvider +from talonctl.providers.lookup_file_provider import LookupFileProvider +from talonctl.providers.rtr_put_file_provider import RTRPutFileProvider +from talonctl.providers.rtr_script_provider import RTRScriptProvider +from talonctl.providers.saved_search_provider import SavedSearchProvider +from talonctl.providers.workflow_provider import WorkflowProvider + +EXAMPLES_DIR = Path(__file__).parent.parent.parent / "examples" / "resources" + +PROVIDER_BY_TYPE = { + "detection": DetectionProvider, + "saved_search": SavedSearchProvider, + "dashboard": DashboardProvider, + "workflow": WorkflowProvider, + "lookup_file": LookupFileProvider, + "rtr_script": RTRScriptProvider, + "rtr_put_file": RTRPutFileProvider, +} + + +def _example_files(): + if not EXAMPLES_DIR.exists(): + return [] + return sorted(p for p in EXAMPLES_DIR.glob("*.yaml")) + + +def _resource_type_for(tmpl, yaml_path): + # Explicit `type:` wins; otherwise map filename stems that have a common prefix + # (e.g. saved_search_function.yaml -> saved_search). + rt = tmpl.get("type") + if rt: + return rt + stem = yaml_path.stem + if stem.startswith("saved_search"): + return "saved_search" + return stem + + +def _build_provider(cls): + if cls is WorkflowProvider: + with patch("talonctl.providers.workflow_provider.load_credentials") as mock_creds: + mock_creds.return_value = { + "falcon_client_id": "test", + "falcon_client_secret": "test", + "base_url": "https://api.crowdstrike.com", + } + with patch("talonctl.providers.workflow_provider.Workflows"): + return cls(None) + return cls(None) + + +@pytest.mark.parametrize("yaml_path", _example_files(), ids=lambda p: p.name) +def test_example_template_validates(yaml_path): + with open(yaml_path) as f: + tmpl = yaml.safe_load(f) + + # Providers that resolve file-relative paths (rtr_put_file, rtr_script) expect + # the template loader to set _template_path — simulate that here. + tmpl["_template_path"] = str(yaml_path) + + resource_type = _resource_type_for(tmpl, yaml_path) + provider_cls = PROVIDER_BY_TYPE.get(resource_type) + assert provider_cls is not None, ( + f"{yaml_path.name}: unknown resource type {resource_type!r}. " + f"Either add 'type:' to the template or rename the file to match a provider." + ) + + provider = _build_provider(provider_cls) + errors = provider.validate_template(tmpl) + assert errors == [], f"{yaml_path.name} failed validation: {errors}" diff --git a/tests/unit/test_lookup_file_provider.py b/tests/unit/test_lookup_file_provider.py index 379e433..8473628 100644 --- a/tests/unit/test_lookup_file_provider.py +++ b/tests/unit/test_lookup_file_provider.py @@ -347,6 +347,40 @@ def test_valid_search_domains(self, provider): finally: os.unlink(template["source"]) + # --- v0.3.0 metadata namespace redesign --- + + @pytest.fixture + def minimal_lookup(self, tmp_path): + csv_file = tmp_path / "ips.csv" + csv_file.write_text("ip\n1.2.3.4\n") + return { + "resource_id": "x", + "name": "ips", + "format": "csv", + "description": "test lookup", + "source": str(csv_file), + } + + def test_v03_metadata_maturity_validates_on_lookup(self, provider, minimal_lookup): + minimal_lookup["metadata"] = {"maturity": {"created": "2026-04-16"}} + assert provider.validate_template(minimal_lookup) == [] + + def test_v03_metadata_ads_rejected_on_lookup(self, provider, minimal_lookup): + minimal_lookup["metadata"] = {"ads": {"goal": "g"}} + errors = provider.validate_template(minimal_lookup) + assert any("metadata.ads is only supported on detection resources" in e and "lookup_file" in e for e in errors) + + def test_v03_old_top_level_ads_rejected_on_lookup(self, provider, minimal_lookup): + minimal_lookup["ads"] = {"goal": "g"} + errors = provider.validate_template(minimal_lookup) + assert any("Top-level 'ads:' is removed in v0.3.0" in e for e in errors) + + def test_v03_metadata_edits_do_not_change_content_hash(self, provider, minimal_lookup): + base_hash = provider.compute_content_hash(minimal_lookup) + with_metadata = dict(minimal_lookup) + with_metadata["metadata"] = {"maturity": {"tune_count": 3}} + assert provider.compute_content_hash(with_metadata) == base_hash + if __name__ == "__main__": pytest.main([__file__, "-v"]) diff --git a/tests/unit/test_metadata_validators.py b/tests/unit/test_metadata_validators.py new file mode 100644 index 0000000..682dbf1 --- /dev/null +++ b/tests/unit/test_metadata_validators.py @@ -0,0 +1,142 @@ +"""Unit tests for core/metadata_validators.py. + +The maturity validator is universal across all seven resource types. These tests +verify shape only — the caller decides whether to run them (e.g. every provider's +validate_template adds the returned errors to its own error list). +""" + +from __future__ import annotations + +import pytest + +from talonctl.core.metadata_validators import reject_old_shape, validate_maturity + + +class TestValidateMaturityAbsent: + def test_no_metadata_key_is_valid(self): + assert validate_maturity({"name": "x"}) == [] + + def test_metadata_without_maturity_subkey_is_valid(self): + assert validate_maturity({"metadata": {"ads": {}}}) == [] + + def test_empty_maturity_block_is_valid(self): + # All four fields are optional when the block is present. + assert validate_maturity({"metadata": {"maturity": {}}}) == [] + + +class TestValidateMaturityShape: + def test_metadata_not_a_dict(self): + errors = validate_maturity({"metadata": "oops"}) + assert errors == ["'metadata' must be a dictionary"] + + def test_maturity_not_a_dict(self): + errors = validate_maturity({"metadata": {"maturity": "oops"}}) + assert errors == ["'metadata.maturity' must be a dictionary"] + + def test_unknown_maturity_field(self): + errors = validate_maturity({"metadata": {"maturity": {"typo": "v", "confidence": "high"}}}) + assert len(errors) == 1 + assert "Unknown metadata.maturity key(s): typo" in errors[0] + assert "Known keys:" in errors[0] + + def test_multiple_unknown_fields_sorted(self): + errors = validate_maturity({"metadata": {"maturity": {"zzz": 1, "aaa": 2}}}) + assert len(errors) == 1 + assert "aaa, zzz" in errors[0] + + +class TestValidateMaturityDateFields: + @pytest.mark.parametrize("field", ["created", "last_tuned"]) + def test_bad_date_format_rejected(self, field): + errors = validate_maturity({"metadata": {"maturity": {field: "not-a-date"}}}) + assert any(f"metadata.maturity.{field}" in e for e in errors) + + @pytest.mark.parametrize("field", ["created", "last_tuned"]) + def test_valid_iso_date_accepted(self, field): + assert validate_maturity({"metadata": {"maturity": {field: "2026-04-16"}}}) == [] + + def test_last_tuned_allows_null(self): + assert validate_maturity({"metadata": {"maturity": {"last_tuned": None}}}) == [] + + def test_created_rejects_null(self): + errors = validate_maturity({"metadata": {"maturity": {"created": None}}}) + assert any("metadata.maturity.created" in e for e in errors) + + +class TestValidateMaturityTuneCount: + def test_zero_ok(self): + assert validate_maturity({"metadata": {"maturity": {"tune_count": 0}}}) == [] + + def test_positive_int_ok(self): + assert validate_maturity({"metadata": {"maturity": {"tune_count": 42}}}) == [] + + def test_negative_rejected(self): + errors = validate_maturity({"metadata": {"maturity": {"tune_count": -1}}}) + assert any("tune_count" in e for e in errors) + + def test_bool_rejected(self): + # True would accidentally pass isinstance(True, int); explicitly rejected. + errors = validate_maturity({"metadata": {"maturity": {"tune_count": True}}}) + assert any("tune_count" in e for e in errors) + + def test_float_rejected(self): + errors = validate_maturity({"metadata": {"maturity": {"tune_count": 1.5}}}) + assert any("tune_count" in e for e in errors) + + +class TestValidateMaturityConfidence: + @pytest.mark.parametrize("val", ["low", "medium", "high", "validated"]) + def test_allowed_values(self, val): + assert validate_maturity({"metadata": {"maturity": {"confidence": val}}}) == [] + + def test_unknown_value_rejected(self): + errors = validate_maturity({"metadata": {"maturity": {"confidence": "supreme"}}}) + assert any("metadata.maturity.confidence" in e for e in errors) + assert any("low, medium, high, validated" in e for e in errors) + + +class TestValidateMaturityAccumulatesErrors: + def test_multiple_problems_all_reported(self): + errors = validate_maturity( + { + "metadata": { + "maturity": { + "created": "bad-date", + "tune_count": -5, + "confidence": "wrong", + } + } + } + ) + # Should surface three distinct errors in one pass. + assert len(errors) == 3 + + +class TestRejectOldShape: + def test_clean_template_no_errors(self): + assert reject_old_shape({"name": "x", "metadata": {"maturity": {}}}) == [] + + def test_top_level_ads_rejected(self): + errors = reject_old_shape({"name": "x", "ads": {"goal": "g"}}) + assert len(errors) == 1 + assert "Top-level 'ads:' is removed in v0.3.0" in errors[0] + assert "metadata.ads" in errors[0] + assert "CHANGELOG.md" in errors[0] + + def test_flat_metadata_maturity_field_at_root_rejected(self): + # Old-style top-level metadata: {created: ..., tune_count: ...} + errors = reject_old_shape({"metadata": {"created": "2026-04-16", "tune_count": 2}}) + assert len(errors) == 1 + assert "Top-level 'metadata:' now reserves sub-namespaces" in errors[0] + assert "metadata.maturity" in errors[0] + + def test_new_shape_with_maturity_nested_not_rejected(self): + # metadata.maturity.created is the new correct shape. + assert reject_old_shape({"metadata": {"maturity": {"created": "2026-04-16"}}}) == [] + + def test_metadata_with_only_third_party_namespace_not_rejected(self): + assert reject_old_shape({"metadata": {"acme_corp": {"anything": True}}}) == [] + + def test_both_old_ads_and_flat_metadata_both_reported(self): + errors = reject_old_shape({"ads": {}, "metadata": {"created": "2026-04-16"}}) + assert len(errors) == 2 diff --git a/tests/unit/test_old_shape_rejection.py b/tests/unit/test_old_shape_rejection.py new file mode 100644 index 0000000..ea38237 --- /dev/null +++ b/tests/unit/test_old_shape_rejection.py @@ -0,0 +1,133 @@ +"""Lock-in tests: every provider rejects pre-v0.3.0 top-level ads: and flat metadata: +with the exact CHANGELOG-pointing error. Protects the migration escape hatch from +silent refactors. +""" + +from __future__ import annotations + +import pytest + +from talonctl.providers.dashboard_provider import DashboardProvider +from talonctl.providers.detection_provider import DetectionProvider +from talonctl.providers.lookup_file_provider import LookupFileProvider +from talonctl.providers.rtr_put_file_provider import RTRPutFileProvider +from talonctl.providers.rtr_script_provider import RTRScriptProvider +from talonctl.providers.saved_search_provider import SavedSearchProvider +from talonctl.providers.workflow_provider import WorkflowProvider + + +def _minimal_template_for(provider_cls, tmp_path): + """Return a shape that passes other validation requirements per provider, so + the only source of errors is the old-shape rejection under test.""" + if provider_cls is DetectionProvider: + return { + "resource_id": "x", + "name": "n", + "description": "d", + "severity": 50, + "search": {"filter": "x"}, + } + if provider_cls is SavedSearchProvider: + return { + "resource_id": "x", + "$schema": "https://schemas.humio.com/query/v0.5.0", + "name": "n", + "queryString": "x", + "_search_domain": "falcon", + } + if provider_cls is DashboardProvider: + return { + "resource_id": "x", + "name": "n", + "sections": {"s0": {"order": 0, "widgetIds": ["w0"]}}, + "widgets": {"w0": {"type": "note", "text": "hi"}}, + } + if provider_cls is WorkflowProvider: + return { + "resource_id": "x", + "name": "n", + "enabled": True, + "trigger": {"event": "e", "type": "Signal"}, + "actions": {"a": {}}, + "conditions": {}, + } + if provider_cls is LookupFileProvider: + csv = tmp_path / "ips.csv" + csv.write_text("ip\n1.2.3.4\n") + return { + "resource_id": "x", + "name": "n", + "format": "csv", + "description": "d", + "source": str(csv), + } + if provider_cls is RTRScriptProvider: + return { + "resource_id": "x", + "name": "n", + "description": "d", + "platform": "linux", + "permission_type": "private", + "content": "#!/bin/sh\necho hi\n", + } + if provider_cls is RTRPutFileProvider: + bin_file = tmp_path / "payload.bin" + bin_file.write_bytes(b"\x00\x01") + return { + "resource_id": "x", + "name": "n", + "description": "d", + "file_path": "payload.bin", + "_template_path": str(tmp_path / "tmpl.yaml"), + } + raise AssertionError(f"no minimal template for {provider_cls!r}") + + +ALL_PROVIDERS = [ + DetectionProvider, + SavedSearchProvider, + DashboardProvider, + WorkflowProvider, + LookupFileProvider, + RTRScriptProvider, + RTRPutFileProvider, +] + + +def _build_provider(cls): + # Providers all accept (falcon_client, config). WorkflowProvider needs credential patching. + if cls is WorkflowProvider: + from unittest.mock import patch + + with patch("talonctl.providers.workflow_provider.load_credentials") as mock_creds: + mock_creds.return_value = { + "falcon_client_id": "test", + "falcon_client_secret": "test", + "base_url": "https://api.crowdstrike.com", + } + with patch("talonctl.providers.workflow_provider.Workflows"): + return cls(None) + return cls(None) + + +@pytest.mark.parametrize("cls", ALL_PROVIDERS) +def test_every_provider_rejects_top_level_ads(cls, tmp_path): + provider = _build_provider(cls) + tmpl = _minimal_template_for(cls, tmp_path) + tmpl["ads"] = {"goal": "g"} + errors = provider.validate_template(tmpl) + assert any( + "Top-level 'ads:' is removed in v0.3.0" in e and "metadata.ads" in e and "CHANGELOG.md" in e for e in errors + ), f"{cls.__name__} missing migration pointer: {errors!r}" + + +@pytest.mark.parametrize("cls", ALL_PROVIDERS) +def test_every_provider_rejects_flat_metadata_maturity(cls, tmp_path): + provider = _build_provider(cls) + tmpl = _minimal_template_for(cls, tmp_path) + tmpl["metadata"] = {"created": "2026-04-16", "tune_count": 2} + errors = provider.validate_template(tmpl) + assert any( + "Top-level 'metadata:' now reserves sub-namespaces" in e and "metadata.maturity" in e and "CHANGELOG.md" in e + for e in errors + ), f"{cls.__name__} missing migration pointer: {errors!r}" diff --git a/tests/unit/test_rtr_put_file_provider.py b/tests/unit/test_rtr_put_file_provider.py index e0bdb6c..e498a2a 100644 --- a/tests/unit/test_rtr_put_file_provider.py +++ b/tests/unit/test_rtr_put_file_provider.py @@ -404,6 +404,35 @@ def test_apply_delete_alias(self, provider_with_api): result = provider_with_api.apply_delete("abc123") assert result["id"] == "abc123" + # --- v0.3.0 metadata namespace redesign --- + + @pytest.fixture + def minimal_rtr_put(self, tmp_path): + bin_file = tmp_path / "payload.bin" + bin_file.write_bytes(b"\x00\x01\x02") + return { + "resource_id": "x", + "name": "payload", + "description": "test put file", + "file_path": "payload.bin", + "_template_path": str(tmp_path / "tmpl.yaml"), + } + + def test_v03_metadata_maturity_validates_on_rtr_put(self, provider, minimal_rtr_put): + minimal_rtr_put["metadata"] = {"maturity": {"created": "2026-04-16"}} + assert provider.validate_template(minimal_rtr_put) == [] + + def test_v03_metadata_ads_rejected_on_rtr_put(self, provider, minimal_rtr_put): + minimal_rtr_put["metadata"] = {"ads": {"goal": "g"}} + errors = provider.validate_template(minimal_rtr_put) + assert any("metadata.ads is only supported on detection resources" in e and "rtr_put_file" in e for e in errors) + + def test_v03_metadata_edits_do_not_change_content_hash(self, provider, minimal_rtr_put): + base_hash = provider.compute_content_hash(minimal_rtr_put) + with_metadata = dict(minimal_rtr_put) + with_metadata["metadata"] = {"maturity": {"tune_count": 2}} + assert provider.compute_content_hash(with_metadata) == base_hash + if __name__ == "__main__": pytest.main([__file__, "-v"]) diff --git a/tests/unit/test_rtr_script_provider.py b/tests/unit/test_rtr_script_provider.py index 877a8d3..e41db4a 100644 --- a/tests/unit/test_rtr_script_provider.py +++ b/tests/unit/test_rtr_script_provider.py @@ -540,6 +540,44 @@ def test_apply_delete_alias(self, provider_with_api): result = provider_with_api.apply_delete("abc123") assert result["id"] == "abc123" + # --- v0.3.0 metadata namespace redesign --- + + @pytest.fixture + def minimal_rtr_script(self, tmp_path): + script = tmp_path / "hello.sh" + script.write_text("#!/bin/sh\necho hi\n") + return { + "resource_id": "x", + "name": "hello", + "description": "say hi", + "platform": "linux", + "permission_type": "private", + "content": "#!/bin/sh\necho hi\n", + "_template_path": str(tmp_path / "tmpl.yaml"), + } + + def test_v03_metadata_maturity_validates_on_rtr_script(self, provider, minimal_rtr_script): + minimal_rtr_script["metadata"] = {"maturity": {"created": "2026-04-16"}} + assert provider.validate_template(minimal_rtr_script) == [] + + def test_v03_metadata_ads_rejected_on_rtr_script(self, provider, minimal_rtr_script): + minimal_rtr_script["metadata"] = {"ads": {"goal": "g"}} + errors = provider.validate_template(minimal_rtr_script) + assert any("metadata.ads is only supported on detection resources" in e and "rtr_script" in e for e in errors) + + def test_v03_metadata_edits_do_not_change_content_hash(self, provider, minimal_rtr_script): + base_hash = provider.compute_content_hash(minimal_rtr_script) + with_metadata = dict(minimal_rtr_script) + with_metadata["metadata"] = {"maturity": {"tune_count": 7}} + assert provider.compute_content_hash(with_metadata) == base_hash + + def test_v03_template_path_still_consumed_before_strip(self, provider, minimal_rtr_script): + # Regression guard: if strip_for_hash runs before _template_path is consumed, + # the script-file lookup will silently fall back to "." and produce a + # different hash. This test asserts behavior does NOT regress. + h = provider.compute_content_hash(minimal_rtr_script) + assert isinstance(h, str) and len(h) == 64 + if __name__ == "__main__": pytest.main([__file__, "-v"]) diff --git a/tests/unit/test_saved_search_provider.py b/tests/unit/test_saved_search_provider.py index ce8e841..90ee92b 100644 --- a/tests/unit/test_saved_search_provider.py +++ b/tests/unit/test_saved_search_provider.py @@ -446,3 +446,57 @@ def test_update_resource_failure(self, provider, mock_falcon): with pytest.raises(RuntimeError, match="Failed to update saved query"): provider.update_resource("query-123", template, current_state) + + # --- v0.3.0 metadata namespace redesign --- + + @pytest.fixture + def minimal_saved_search(self): + """Minimal valid saved_search template (v0.3.0 shape).""" + return { + "resource_id": "x", + "$schema": "https://schemas.humio.com/query/v0.5.0", + "name": "test_query", + "queryString": "#repo=test", + "_search_domain": "falcon", + } + + def test_v03_metadata_maturity_validates_on_saved_search(self, provider, minimal_saved_search): + minimal_saved_search["metadata"] = {"maturity": {"created": "2026-04-16", "confidence": "high"}} + assert provider.validate_template(minimal_saved_search) == [] + + def test_v03_metadata_ads_rejected_on_non_detection(self, provider, minimal_saved_search): + minimal_saved_search["metadata"] = {"ads": {"goal": "g"}} + errors = provider.validate_template(minimal_saved_search) + assert any("metadata.ads is only supported on detection resources" in e and "saved_search" in e for e in errors) + + def test_v03_old_top_level_ads_rejected_on_saved_search(self, provider, minimal_saved_search): + minimal_saved_search["ads"] = {"goal": "g"} + errors = provider.validate_template(minimal_saved_search) + assert any("Top-level 'ads:' is removed in v0.3.0" in e for e in errors) + + def test_v03_metadata_edits_do_not_change_content_hash(self, provider, minimal_saved_search): + base_hash = provider.compute_content_hash(minimal_saved_search) + with_metadata = dict(minimal_saved_search) + with_metadata["metadata"] = { + "maturity": {"created": "2026-04-16", "tune_count": 3}, + "acme_corp": {"x": 1}, + } + assert provider.compute_content_hash(with_metadata) == base_hash + + def test_v03_payload_strips_metadata_and_internal_fields(self, provider, minimal_saved_search): + tmpl = dict(minimal_saved_search) + tmpl["metadata"] = {"maturity": {"created": "2026-04-16"}, "acme_corp": {"a": 1}} + tmpl["_template_path"] = "/tmp/x.yaml" + tmpl["_probe_internal"] = "should-not-leak" + # Saved-search provider builds clean_template inline in create/update. + # Verify the helper strips the reserved/internal keys BEFORE the API call. + from talonctl.core.template_sanitizer import strip_for_api + + cleaned = strip_for_api(tmpl) + assert "metadata" not in cleaned + assert "_template_path" not in cleaned + assert "_probe_internal" not in cleaned + assert "resource_id" not in cleaned + # Provider-owned fields must survive. + assert cleaned["name"] == "test_query" + assert cleaned["queryString"] == "#repo=test" diff --git a/tests/unit/test_workflow_provider.py b/tests/unit/test_workflow_provider.py index 81626d4..92a1a77 100644 --- a/tests/unit/test_workflow_provider.py +++ b/tests/unit/test_workflow_provider.py @@ -315,6 +315,39 @@ def test_validate_template_requires_resource_id(self, provider): errors = provider.validate_template(template) assert any("resource_id" in e.lower() for e in errors) + # --- v0.3.0 metadata namespace redesign --- + + @pytest.fixture + def minimal_workflow(self): + return { + "resource_id": "x", + "name": "Test Workflow", + "enabled": True, + "trigger": {"event": "Investigatable/NGSIEM", "type": "Signal"}, + "actions": {"a": {}}, + "conditions": {}, + } + + def test_v03_metadata_maturity_validates_on_workflow(self, provider, minimal_workflow): + minimal_workflow["metadata"] = {"maturity": {"created": "2026-04-16"}} + assert provider.validate_template(minimal_workflow) == [] + + def test_v03_metadata_ads_rejected_on_workflow(self, provider, minimal_workflow): + minimal_workflow["metadata"] = {"ads": {"goal": "g"}} + errors = provider.validate_template(minimal_workflow) + assert any("metadata.ads is only supported on detection resources" in e and "workflow" in e for e in errors) + + def test_v03_old_top_level_ads_rejected_on_workflow(self, provider, minimal_workflow): + minimal_workflow["ads"] = {"goal": "g"} + errors = provider.validate_template(minimal_workflow) + assert any("Top-level 'ads:' is removed in v0.3.0" in e for e in errors) + + def test_v03_metadata_edits_do_not_change_content_hash(self, provider, minimal_workflow): + base_hash = provider.compute_content_hash(minimal_workflow) + with_metadata = dict(minimal_workflow) + with_metadata["metadata"] = {"maturity": {"tune_count": 5}, "acme": {"x": 1}} + assert provider.compute_content_hash(with_metadata) == base_hash + if __name__ == "__main__": pytest.main([__file__, "-v"])