diff --git a/src/talonctl/commands/validate.py b/src/talonctl/commands/validate.py index b460e84..e8e6675 100644 --- a/src/talonctl/commands/validate.py +++ b/src/talonctl/commands/validate.py @@ -9,18 +9,36 @@ parse_filters, init_orchestrator, ) +from talonctl.utils.auth import load_credentials @click.command() @filter_options @state_options +@click.option( + "--queries", + "-Q", + "parse_queries", + is_flag=True, + help="After schema validation, CQL-parse every query against NGSIEM. Requires credentials.", +) @click.pass_context -def validate(ctx, resources, tags, names, state_file): +def validate(ctx, resources, tags, names, state_file, parse_queries): """Validate all templates without deploying.""" console.print("[bold blue]Validating templates...[/bold blue]\n") verbose = ctx.obj.get("verbose", False) - orchestrator = init_orchestrator(state_file=state_file, require_credentials=False) + if parse_queries: + if load_credentials() is None: + console.print("[red]✗ --queries requires configured credentials.[/red]") + console.print(" Run 'talonctl auth setup' or unset --queries for schema-only validation.") + raise SystemExit(1) + + # Schema validation is always offline — no credentials needed. + orchestrator = init_orchestrator( + state_file=state_file, + require_credentials=parse_queries, + ) filters = parse_filters(resources, tags, names) try: @@ -29,9 +47,25 @@ def validate(ctx, resources, tags, names, state_file): results = orchestrator.validate(**filters) formatter = PlanFormatter(console, verbose=verbose) formatter.format_validation_results(results) + has_errors = any(errors for errors in results.values() if errors) if has_errors: raise SystemExit(1) + + if parse_queries: + try: + query_results = orchestrator.validate_queries(**filters) + except ValueError as e: + console.print("[red]✗ --queries requires configured credentials.[/red]") + console.print(" Run 'talonctl auth setup' or unset --queries for schema-only validation.") + if verbose: + console.print(f" [dim]{e}[/dim]") + raise SystemExit(1) + + formatter.format_query_validation(query_results) + if any(not r.is_valid for r in query_results): + raise SystemExit(1) + except SystemExit: raise except Exception as e: diff --git a/src/talonctl/commands/validate_query.py b/src/talonctl/commands/validate_query.py index 28def0a..3fed4ef 100644 --- a/src/talonctl/commands/validate_query.py +++ b/src/talonctl/commands/validate_query.py @@ -65,7 +65,7 @@ def validate_query(ctx, query, query_file, template): if result["valid"]: console.print("VALID") else: - console.print(f"INVALID: {result.get('message', 'Unknown error')}") + console.print(f"INVALID: {result['message']}") raise SystemExit(1) except SystemExit: diff --git a/src/talonctl/core/__init__.py b/src/talonctl/core/__init__.py index ac24e42..4e0fed1 100644 --- a/src/talonctl/core/__init__.py +++ b/src/talonctl/core/__init__.py @@ -26,6 +26,7 @@ from talonctl.core.plan_formatter import PlanFormatter from talonctl.core.drift_detector import DriftDetector, DriftReport, DriftItem from talonctl.core.dependency_validator import DependencyValidator, DependencyIssue +from talonctl.core.query_collection import QueryRef, collect_queries_from_templates __all__ = [ # Base provider @@ -59,4 +60,7 @@ # Dependency validation "DependencyValidator", "DependencyIssue", + # Query collection + "QueryRef", + "collect_queries_from_templates", ] diff --git a/src/talonctl/core/deployment_orchestrator.py b/src/talonctl/core/deployment_orchestrator.py index 01080ee..d33bd46 100644 --- a/src/talonctl/core/deployment_orchestrator.py +++ b/src/talonctl/core/deployment_orchestrator.py @@ -20,6 +20,7 @@ from talonctl.core.state_synchronizer import StateSynchronizer from talonctl.core.drift_detector import DriftDetector, DriftReport from talonctl.core.base_provider import ResourceAction, ResourceChange +from talonctl.core.query_collection import collect_queries_from_templates # Try to import NGSIEMClient for query validation try: @@ -34,13 +35,14 @@ @dataclass class QueryValidationResult: - """Result of FQL query validation""" + """Result of FQL query validation.""" resource_id: str resource_name: str is_valid: bool error_message: Optional[str] = None query_snippet: Optional[str] = None # First 100 chars of query + location: Optional[str] = None # Field path within template, e.g. 'search.filter' @dataclass @@ -366,7 +368,7 @@ def _validate_detection_queries( # Extract query from search config (supports both 'query' and 'filter') search_config = template.template_data.get("search", {}) - query = search_config.get("query") or search_config.get("filter") + query = search_config.get("filter") or search_config.get("query") if query: # Clean query for display @@ -863,6 +865,66 @@ def validate( return results + def validate_queries( + self, + resource_types: Optional[List[str]] = None, + tags: Optional[List[str]] = None, + names: Optional[List[str]] = None, + max_workers: int = 20, + ) -> List[QueryValidationResult]: + """ + Parse every CQL query in every query-bearing template against NGSIEM. + + Does NOT run schema validation — caller is responsible for that. + Raises ValueError (from NGSIEMClient) if credentials are missing. + """ + if not NGSIEM_CLIENT_AVAILABLE: + raise RuntimeError("NGSIEM client unavailable — cannot validate queries") + + discovered = self.template_discovery.discover_all(resource_types=resource_types, tags=tags, names=names) + refs = collect_queries_from_templates(discovered) + if not refs: + return [] + + logger.info(f"Validating {len(refs)} queries across {len(discovered)} resource types") + ngsiem_client = NGSIEMClient() + + def validate_one(ref): + try: + result = ngsiem_client.test_query_syntax(ref.query) + return QueryValidationResult( + resource_id=ref.resource_id, + resource_name=ref.resource_name, + is_valid=result["valid"], + error_message=None if result["valid"] else result.get("message"), + query_snippet=ref.query_snippet, + location=ref.location, + ) + except Exception as e: + return QueryValidationResult( + resource_id=ref.resource_id, + resource_name=ref.resource_name, + is_valid=False, + error_message=f"Validation error: {e}", + query_snippet=ref.query_snippet, + location=ref.location, + ) + + results: List[QueryValidationResult] = [] + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [executor.submit(validate_one, r) for r in refs] + for future in as_completed(futures): + results.append(future.result()) + + valid = sum(1 for r in results if r.is_valid) + invalid = len(results) - valid + if invalid: + logger.warning(f"Query validation: {valid} valid, {invalid} invalid") + else: + logger.info(f"Query validation: all {valid} queries valid") + + return results + def sync( self, resource_types: Optional[List[str]] = None, diff --git a/src/talonctl/core/plan_formatter.py b/src/talonctl/core/plan_formatter.py index 2e245ec..c1b34b2 100644 --- a/src/talonctl/core/plan_formatter.py +++ b/src/talonctl/core/plan_formatter.py @@ -17,21 +17,11 @@ # Import ResourceChange and ResourceAction from the canonical definition in base_provider from .base_provider import ResourceChange, ResourceAction +from .deployment_orchestrator import QueryValidationResult logger = logging.getLogger(__name__) -@dataclass -class QueryValidationResult: - """Result of FQL query validation""" - - resource_id: str - resource_name: str - is_valid: bool - error_message: Optional[str] = None - query_snippet: Optional[str] = None - - @dataclass class DeploymentPlan: """Represents a complete deployment plan""" @@ -385,8 +375,7 @@ def format_validation_results(self, results: Dict[str, List[str]]) -> None: self.console.print(f"\n[red]✗ {invalid_templates} of {total_templates} templates have errors[/red]\n") def format_query_validation(self, results: List[QueryValidationResult]) -> None: - """ - Format FQL query validation results + """Format CQL query validation results. Args: results: List of query validation results @@ -399,13 +388,15 @@ def format_query_validation(self, results: List[QueryValidationResult]) -> None: valid = total - invalid self.console.print() - self.console.print(Panel.fit("[bold]FQL Query Validation[/bold]", border_style="blue")) + self.console.print(Panel.fit("[bold]Query Validation[/bold]", border_style="blue")) self.console.print() # Show invalid queries first for result in results: if not result.is_valid: self.console.print(f"[red]✗[/red] [bold]{result.resource_id}[/bold]") + if result.location: + self.console.print(f" [dim]at:[/dim] {result.location}") if result.error_message: self.console.print(f" [red]Error:[/red] {result.error_message}") if result.query_snippet: @@ -414,14 +405,14 @@ def format_query_validation(self, results: List[QueryValidationResult]) -> None: # Show valid queries (only if there are some invalid ones) if invalid > 0 and valid > 0: - self.console.print(f"[green]✓[/green] {valid} detection(s) with valid queries\n") + self.console.print(f"[green]✓[/green] {valid} queries valid\n") # Summary if invalid == 0: - self.console.print(f"[green]✓ All {total} detection queries are valid[/green]\n") + self.console.print(f"[green]✓ All {total} queries are valid[/green]\n") else: - self.console.print(f"[red]✗ {invalid} of {total} detection queries have errors[/red]\n") - self.console.print("[yellow]⚠[/yellow] Deployment will be blocked until all queries are valid.\n") + self.console.print(f"[red]✗ {invalid} of {total} queries rejected by LogScale[/red]\n") + self.console.print("[yellow]⚠[/yellow] Fix the above queries before running plan or apply.\n") def format_drift_report(self, report: Any) -> None: """ diff --git a/src/talonctl/core/query_collection.py b/src/talonctl/core/query_collection.py new file mode 100644 index 0000000..346ca49 --- /dev/null +++ b/src/talonctl/core/query_collection.py @@ -0,0 +1,89 @@ +"""Extract CQL query references from discovered templates. + +Each query-bearing resource type exposes its CQL in different fields. This +module centralises that knowledge so callers (validate --queries, future +refactors of the plan-path validator) can iterate queries uniformly without +embedding per-type field names. +""" + +from dataclasses import dataclass +from typing import Callable, Dict, List, Tuple + +from talonctl.core.template_discovery import DiscoveredTemplate + + +@dataclass +class QueryRef: + resource_type: str + resource_id: str + resource_name: str + query: str + location: str + query_snippet: str + + +def _make_snippet(query: str) -> str: + collapsed = " ".join(query.split()) + if len(collapsed) > 100: + return collapsed[:100] + "..." + return collapsed + + +def _extract_detection(template: dict) -> List[Tuple[str, str]]: + """Return list of (query, location) pairs. Detection has 0 or 1.""" + search = template.get("search") or {} + for field in ("filter", "query"): + value = search.get(field) + if isinstance(value, str) and value.strip(): + return [(value, f"search.{field}")] + return [] + + +def _extract_saved_search(template: dict) -> List[Tuple[str, str]]: + value = template.get("queryString") + if isinstance(value, str) and value.strip(): + return [(value, "queryString")] + return [] + + +def _extract_dashboard(template: dict) -> List[Tuple[str, str]]: + pairs: List[Tuple[str, str]] = [] + for widget_id, widget in (template.get("widgets") or {}).items(): + qs = widget.get("queryString") + if isinstance(qs, str) and qs.strip(): + pairs.append((qs, f"widgets.{widget_id}.queryString")) + for param_id, param in (template.get("parameters") or {}).items(): + q = param.get("query") + if isinstance(q, str) and q.strip(): + pairs.append((q, f"parameters.{param_id}.query")) + return pairs + + +_EXTRACTORS: Dict[str, Callable[[dict], List[Tuple[str, str]]]] = { + "detection": _extract_detection, + "saved_search": _extract_saved_search, + "dashboard": _extract_dashboard, +} + + +def collect_queries_from_templates( + templates_by_type: Dict[str, List[DiscoveredTemplate]], +) -> List[QueryRef]: + refs: List[QueryRef] = [] + for resource_type, templates in templates_by_type.items(): + extractor = _EXTRACTORS.get(resource_type) + if extractor is None: + continue + for template in templates: + for query, location in extractor(template.template_data): + refs.append( + QueryRef( + resource_type=resource_type, + resource_id=template.resource_id, + resource_name=template.name, + query=query, + location=location, + query_snippet=_make_snippet(query), + ) + ) + return refs diff --git a/src/talonctl/utils/ngsiem_client.py b/src/talonctl/utils/ngsiem_client.py index a7c514a..0ea6c2e 100644 --- a/src/talonctl/utils/ngsiem_client.py +++ b/src/talonctl/utils/ngsiem_client.py @@ -163,7 +163,13 @@ def execute_query(self, query: str, start_time: str = "1d", config: Optional[Que ) if response.get("status_code") != 200: - error_msg = f"Start search failed: {response.get('status_code')} - {response.get('body', {}).get('errors', 'Unknown error')}" + body = response.get("body") or {} + errors = body.get("errors") + status = response.get("status_code") + detail = ( + (errors if isinstance(errors, str) else repr(errors)) if errors else "no detail returned by API" + ) + error_msg = f"Start search failed (status={status}): {detail}" logger.error(error_msg) if attempt < config.max_retries: @@ -324,16 +330,14 @@ def execute_batch_queries( def test_query_syntax(self, query: str) -> Dict[str, Any]: """ - Test query syntax without executing full search - - Args: - query: Query to test + Test query syntax without executing full search. - Returns: - Dict with syntax validation results + The upstream parse endpoint returns a generic rejection without + structured detail. We pass any body.errors payload through verbatim + and include the HTTP status so authors know what the API actually + told us — rather than a misleading 'Unknown error' fallback. """ try: - # Validate query exactly as written - no modifications test_query = query.strip() response = self.client.start_search( @@ -346,11 +350,18 @@ def test_query_syntax(self, query: str) -> Dict[str, Any]: if response.get("status_code") == 200: return {"valid": True, "message": "Query syntax is valid"} + + body = response.get("body") or {} + errors = body.get("errors") + status = response.get("status_code") + + if errors: + detail = errors if isinstance(errors, str) else repr(errors) + message = f"LogScale rejected query (status={status}): {detail}" else: - return { - "valid": False, - "message": f"Syntax error: {response.get('body', {}).get('errors', 'Unknown error')}", - } + message = f"LogScale rejected query (status={status}, no detail returned by API)" + + return {"valid": False, "message": message} except Exception as e: return {"valid": False, "message": f"Syntax test failed: {str(e)}"} diff --git a/tests/unit/test_ngsiem_client_errors.py b/tests/unit/test_ngsiem_client_errors.py new file mode 100644 index 0000000..dd6d6ae --- /dev/null +++ b/tests/unit/test_ngsiem_client_errors.py @@ -0,0 +1,65 @@ +"""NGSIEMClient error-message honesty tests. + +The upstream parse endpoint returns a generic rejection without structured +detail. These tests pin the client to: never say 'Unknown error', always +include the status code, and pass any body.errors payload through verbatim. +""" + +from unittest.mock import MagicMock + +from talonctl.utils.ngsiem_client import NGSIEMClient + + +def _client_with_mock_falconpy(mock_response): + client = NGSIEMClient.__new__(NGSIEMClient) # bypass __init__ / creds + client.config = {} + client.debug = False + mock_falconpy = MagicMock() + mock_falconpy.start_search.return_value = mock_response + mock_falconpy.stop_search.return_value = {"status_code": 200} + client._client = mock_falconpy + return client + + +def test_valid_query_returns_valid(): + client = _client_with_mock_falconpy( + {"status_code": 200, "body": {"id": "abc", "resources": {"id": "abc"}}, "resources": {"id": "abc"}} + ) + result = client.test_query_syntax("| limit 1") + assert result["valid"] is True + + +def test_rejection_with_errors_passes_payload_through(): + errors_payload = [{"message": "unexpected token at column 17"}] + client = _client_with_mock_falconpy({"status_code": 400, "body": {"errors": errors_payload}}) + result = client.test_query_syntax("bad |") + assert result["valid"] is False + assert "Unknown error" not in result["message"] + assert "status=400" in result["message"] + assert "column 17" in result["message"] + + +def test_rejection_with_string_error_passes_through(): + client = _client_with_mock_falconpy({"status_code": 400, "body": {"errors": "bad pipe"}}) + result = client.test_query_syntax("bad |") + assert result["valid"] is False + assert "Unknown error" not in result["message"] + assert "bad pipe" in result["message"] + assert "status=400" in result["message"] + + +def test_rejection_with_empty_body_says_no_detail(): + client = _client_with_mock_falconpy({"status_code": 400, "body": {}}) + result = client.test_query_syntax("bad |") + assert result["valid"] is False + assert "Unknown error" not in result["message"] + assert "no detail returned by API" in result["message"] + assert "status=400" in result["message"] + + +def test_rejection_with_missing_body_says_no_detail(): + client = _client_with_mock_falconpy({"status_code": 400}) + result = client.test_query_syntax("bad |") + assert result["valid"] is False + assert "Unknown error" not in result["message"] + assert "no detail returned by API" in result["message"] diff --git a/tests/unit/test_orchestrator_validate_queries.py b/tests/unit/test_orchestrator_validate_queries.py new file mode 100644 index 0000000..a96d49a --- /dev/null +++ b/tests/unit/test_orchestrator_validate_queries.py @@ -0,0 +1,122 @@ +"""Tests for DeploymentOrchestrator.validate_queries.""" + +from unittest.mock import MagicMock, patch + +import pytest + +from talonctl.core.deployment_orchestrator import DeploymentOrchestrator +from talonctl.core.template_discovery import DiscoveredTemplate + + +def _template(resource_type, name, data): + # NOTE: DiscoveredTemplate.tags is required with no default. + return DiscoveredTemplate( + resource_type=resource_type, + name=name, + file_path="/tmp/ignored.yaml", + template_data=data, + tags=[], + ) + + +@pytest.fixture +def orchestrator(): + orch = DeploymentOrchestrator.__new__(DeploymentOrchestrator) + orch.template_discovery = MagicMock() + return orch + + +def test_validate_queries_all_valid(orchestrator): + orchestrator.template_discovery.discover_all.return_value = { + "detection": [_template("detection", "a", {"search": {"filter": "A"}})], + "saved_search": [_template("saved_search", "b", {"queryString": "B"})], + } + + with patch("talonctl.core.deployment_orchestrator.NGSIEMClient") as MockClient: + MockClient.return_value.test_query_syntax.return_value = {"valid": True} + results = orchestrator.validate_queries() + + assert len(results) == 2 + assert all(r.is_valid for r in results) + assert {r.resource_id for r in results} == {"detection.a", "saved_search.b"} + + +def test_validate_queries_one_invalid_shows_location(orchestrator): + orchestrator.template_discovery.discover_all.return_value = { + "dashboard": [ + _template( + "dashboard", + "d", + { + "widgets": {"w1": {"queryString": "bad |"}}, + }, + ) + ], + } + + with patch("talonctl.core.deployment_orchestrator.NGSIEMClient") as MockClient: + MockClient.return_value.test_query_syntax.return_value = { + "valid": False, + "message": "LogScale rejected query (status=400, no detail returned by API)", + } + results = orchestrator.validate_queries() + + assert len(results) == 1 + assert results[0].is_valid is False + assert results[0].location == "widgets.w1.queryString" + assert "LogScale rejected" in results[0].error_message + + +def test_validate_queries_per_query_exception_captured(orchestrator): + orchestrator.template_discovery.discover_all.return_value = { + "detection": [ + _template("detection", "a", {"search": {"filter": "A"}}), + _template("detection", "b", {"search": {"filter": "B"}}), + ], + } + + responses = iter( + [ + {"valid": True}, + None, # sentinel — we'll raise instead + ] + ) + + def fake_test(query): + r = next(responses) + if r is None: + raise RuntimeError("boom") + return r + + with patch("talonctl.core.deployment_orchestrator.NGSIEMClient") as MockClient: + MockClient.return_value.test_query_syntax.side_effect = fake_test + results = orchestrator.validate_queries() + + assert len(results) == 2 + bad = [r for r in results if not r.is_valid] + assert len(bad) == 1 + assert "boom" in bad[0].error_message + + +def test_validate_queries_empty_fleet_returns_empty(orchestrator): + orchestrator.template_discovery.discover_all.return_value = {} + + with patch("talonctl.core.deployment_orchestrator.NGSIEMClient") as MockClient: + results = orchestrator.validate_queries() + + assert results == [] + MockClient.assert_not_called() # no queries -> no client spin-up + + +def test_validate_queries_filter_propagates(orchestrator): + orchestrator.template_discovery.discover_all.return_value = { + "detection": [_template("detection", "a", {"search": {"filter": "A"}})], + } + + with patch("talonctl.core.deployment_orchestrator.NGSIEMClient") as MockClient: + MockClient.return_value.test_query_syntax.return_value = {"valid": True} + orchestrator.validate_queries(resource_types=["detection"], names=["a"], tags=None) + + orchestrator.template_discovery.discover_all.assert_called_once_with( + resource_types=["detection"], tags=None, names=["a"] + ) diff --git a/tests/unit/test_query_collection.py b/tests/unit/test_query_collection.py new file mode 100644 index 0000000..b14ca87 --- /dev/null +++ b/tests/unit/test_query_collection.py @@ -0,0 +1,149 @@ +"""Tests for talonctl.core.query_collection.""" + +from talonctl.core.query_collection import QueryRef, collect_queries_from_templates +from talonctl.core.template_discovery import DiscoveredTemplate + + +def test_queryref_fields(): + ref = QueryRef( + resource_type="detection", + resource_id="detection.foo", + resource_name="foo", + query="#event.module=/box/", + location="search.filter", + query_snippet="#event.module=/box/", + ) + assert ref.resource_type == "detection" + assert ref.resource_id == "detection.foo" + assert ref.resource_name == "foo" + assert ref.query == "#event.module=/box/" + assert ref.location == "search.filter" + assert ref.query_snippet == "#event.module=/box/" + + +def test_collect_empty_returns_empty_list(): + assert collect_queries_from_templates({}) == [] + + +def _make(resource_type: str, name: str, data: dict) -> DiscoveredTemplate: + return DiscoveredTemplate( + resource_type=resource_type, + name=name, + file_path="/tmp/ignored.yaml", + template_data=data, + tags=[], + ) + + +def test_snippet_collapses_newlines_and_truncates(): + from talonctl.core.query_collection import _make_snippet + + short = _make_snippet(" foo\nbar ") + assert short == "foo bar" + + long_query = "a" * 150 + snippet = _make_snippet(long_query) + assert len(snippet) == 103 # 100 chars + "..." + assert snippet.endswith("...") + + +def test_detection_with_filter(): + t = _make("detection", "box_new_device", {"search": {"filter": "#event.module=/box/"}}) + refs = collect_queries_from_templates({"detection": [t]}) + assert len(refs) == 1 + assert refs[0].location == "search.filter" + assert refs[0].query == "#event.module=/box/" + assert refs[0].resource_id == "detection.box_new_device" + assert refs[0].resource_type == "detection" + + +def test_detection_with_query_only(): + t = _make("detection", "foo", {"search": {"query": "#vendor=aws"}}) + refs = collect_queries_from_templates({"detection": [t]}) + assert len(refs) == 1 + assert refs[0].location == "search.query" + + +def test_detection_prefers_filter_over_query(): + t = _make("detection", "foo", {"search": {"filter": "A", "query": "B"}}) + refs = collect_queries_from_templates({"detection": [t]}) + assert len(refs) == 1 + assert refs[0].query == "A" + assert refs[0].location == "search.filter" + + +def test_detection_without_query_is_skipped(): + t = _make("detection", "foo", {"search": {}}) + assert collect_queries_from_templates({"detection": [t]}) == [] + + t2 = _make("detection", "foo", {"search": {"filter": " "}}) + assert collect_queries_from_templates({"detection": [t2]}) == [] + + +def test_saved_search_with_query_string(): + t = _make("saved_search", "slow_login", {"queryString": "user=* | count()"}) + refs = collect_queries_from_templates({"saved_search": [t]}) + assert len(refs) == 1 + assert refs[0].location == "queryString" + assert refs[0].query == "user=* | count()" + assert refs[0].resource_type == "saved_search" + + +def test_saved_search_without_query_skipped(): + t = _make("saved_search", "foo", {"queryString": ""}) + assert collect_queries_from_templates({"saved_search": [t]}) == [] + + t2 = _make("saved_search", "foo", {}) + assert collect_queries_from_templates({"saved_search": [t2]}) == [] + + +def test_dashboard_widgets_and_parameters_fan_out(): + template_data = { + "widgets": { + "top_ips": {"type": "query", "queryString": "sourceIPAddress=* | groupBy(ip)"}, + "top_users": {"type": "query", "queryString": "user=* | groupBy(user)"}, + "header": {"type": "markdown", "queryString": ""}, + }, + "parameters": { + "region": {"query": "region=*"}, + "empty": {"query": ""}, + }, + } + t = _make("dashboard", "threat_hunting", template_data) + refs = collect_queries_from_templates({"dashboard": [t]}) + assert len(refs) == 3 + + locations = sorted(r.location for r in refs) + assert locations == [ + "parameters.region.query", + "widgets.top_ips.queryString", + "widgets.top_users.queryString", + ] + + for r in refs: + assert r.resource_type == "dashboard" + assert r.resource_id == "dashboard.threat_hunting" + + +def test_dashboard_no_widgets_or_params(): + t = _make("dashboard", "empty", {}) + assert collect_queries_from_templates({"dashboard": [t]}) == [] + + +def test_query_less_types_return_empty(): + for resource_type in ("workflow", "lookup_file", "rtr_script", "rtr_put_file"): + t = _make(resource_type, "foo", {"name": "foo"}) + assert collect_queries_from_templates({resource_type: [t]}) == [], resource_type + + +def test_mixed_types_only_include_known(): + detection = _make("detection", "d1", {"search": {"filter": "A"}}) + workflow = _make("workflow", "wf1", {"name": "wf1"}) + refs = collect_queries_from_templates( + { + "detection": [detection], + "workflow": [workflow], + } + ) + assert len(refs) == 1 + assert refs[0].resource_type == "detection" diff --git a/tests/unit/test_validate_cli.py b/tests/unit/test_validate_cli.py new file mode 100644 index 0000000..d9903d0 --- /dev/null +++ b/tests/unit/test_validate_cli.py @@ -0,0 +1,123 @@ +"""Tests for the talonctl validate CLI, including --queries.""" + +from unittest.mock import MagicMock, patch + +from click.testing import CliRunner + +from talonctl.cli import cli +from talonctl.core.deployment_orchestrator import QueryValidationResult + + +def _fake_orchestrator(validation_results=None, query_results=None, queries_raise=None): + orch = MagicMock() + orch.validate.return_value = validation_results or {} + if queries_raise is not None: + orch.validate_queries.side_effect = queries_raise + else: + orch.validate_queries.return_value = query_results or [] + return orch + + +def test_validate_schema_clean_no_queries_flag(): + runner = CliRunner() + with patch("talonctl.commands.validate.init_orchestrator") as init_orch: + init_orch.return_value = _fake_orchestrator(validation_results={"detection.a": []}) + result = runner.invoke(cli, ["validate"]) + assert result.exit_code == 0 + init_orch.return_value.validate_queries.assert_not_called() + + +def test_validate_schema_errors_short_circuit_before_queries(): + runner = CliRunner() + with patch("talonctl.commands.validate.load_credentials", return_value={"falcon_client_id": "x"}): + with patch("talonctl.commands.validate.init_orchestrator") as init_orch: + init_orch.return_value = _fake_orchestrator( + validation_results={"detection.a": ["missing field 'name'"]}, + ) + result = runner.invoke(cli, ["validate", "--queries"]) + assert result.exit_code == 1 + init_orch.return_value.validate_queries.assert_not_called() + + +def test_validate_queries_all_valid(): + runner = CliRunner() + q_results = [ + QueryValidationResult( + resource_id="detection.a", + resource_name="a", + is_valid=True, + query_snippet="A", + location="search.filter", + ), + ] + with patch("talonctl.commands.validate.load_credentials", return_value={"falcon_client_id": "x"}): + with patch("talonctl.commands.validate.init_orchestrator") as init_orch: + init_orch.return_value = _fake_orchestrator( + validation_results={"detection.a": []}, + query_results=q_results, + ) + result = runner.invoke(cli, ["validate", "--queries"]) + assert result.exit_code == 0 + init_orch.return_value.validate_queries.assert_called_once() + + +def test_validate_queries_one_invalid_exits_nonzero(): + runner = CliRunner() + q_results = [ + QueryValidationResult( + resource_id="dashboard.d", + resource_name="d", + is_valid=False, + error_message="LogScale rejected query (status=400, no detail returned by API)", + query_snippet="bad |", + location="widgets.w1.queryString", + ), + ] + with patch("talonctl.commands.validate.load_credentials", return_value={"falcon_client_id": "x"}): + with patch("talonctl.commands.validate.init_orchestrator") as init_orch: + init_orch.return_value = _fake_orchestrator( + validation_results={"dashboard.d": []}, + query_results=q_results, + ) + result = runner.invoke(cli, ["validate", "--queries"]) + assert result.exit_code == 1 + assert "widgets.w1.queryString" in result.output + + +def test_validate_queries_ngsiem_client_value_error_maps_to_credentials_message(): + """NGSIEMClient raises ValueError (e.g. creds exist but invalid) -> friendly message.""" + runner = CliRunner() + with patch("talonctl.commands.validate.load_credentials", return_value={"falcon_client_id": "x"}): + with patch("talonctl.commands.validate.init_orchestrator") as init_orch: + init_orch.return_value = _fake_orchestrator( + validation_results={"detection.a": []}, + queries_raise=ValueError("Failed to load CrowdStrike credentials"), + ) + result = runner.invoke(cli, ["validate", "--queries"]) + assert result.exit_code == 1 + assert "--queries requires configured credentials" in result.output + assert "talonctl auth setup" in result.output + + +def test_validate_queries_no_credentials_file(): + """Real code path: load_credentials returns None -> documented message before init_orchestrator.""" + runner = CliRunner() + with patch("talonctl.commands.validate.load_credentials", return_value=None) as mock_load: + result = runner.invoke(cli, ["validate", "--queries"]) + assert result.exit_code == 1 + assert "--queries requires configured credentials" in result.output + assert "talonctl auth setup" in result.output + mock_load.assert_called_once() + + +def test_validate_queries_short_flag(): + runner = CliRunner() + with patch("talonctl.commands.validate.load_credentials", return_value={"falcon_client_id": "x"}): + with patch("talonctl.commands.validate.init_orchestrator") as init_orch: + init_orch.return_value = _fake_orchestrator( + validation_results={"detection.a": []}, + query_results=[], + ) + result = runner.invoke(cli, ["validate", "-Q"]) + assert result.exit_code == 0 + init_orch.return_value.validate_queries.assert_called_once()