From af53c5154a169caace5526384c0e26c15715fdee Mon Sep 17 00:00:00 2001 From: Joey Riches Date: Fri, 19 Jun 2026 17:37:17 +0100 Subject: [PATCH] common/CI: Add monitoring formatting checks --- common/CI/package_checks.py | 309 +++++++++++++++++++++++++++++++++++- 1 file changed, 302 insertions(+), 7 deletions(-) diff --git a/common/CI/package_checks.py b/common/CI/package_checks.py index c0732566058..43217021503 100755 --- a/common/CI/package_checks.py +++ b/common/CI/package_checks.py @@ -11,14 +11,9 @@ from dataclasses import dataclass from datetime import datetime, timezone from enum import Enum -from typing import ( - Any, - Callable, - TextIO, - final, - override, -) +from typing import Any, Callable, NotRequired, TextIO, TypedDict, final, override from urllib import request +from urllib.parse import urlparse from xml.etree import ElementTree from ruamel.yaml import YAML @@ -86,6 +81,76 @@ def files(self) -> list[str]: return [str(element.text) for element in self._xml.findall(".//Path")] +class Releases(TypedDict): + """Represents the Releases section of a Monitoring YAML file.""" + + id: NotRequired[int | None] # Note: actually required + rss: NotRequired[str | None] # Note: actually required + ignore: NotRequired[list[str] | None] + + +class CPE(TypedDict): + """Represents the CPE section of a Monitoring YAML file.""" + + vendor: str | None + product: str | None + + +class Security(TypedDict): + """Represents the Security section of a Monitoring YAML file.""" + + cpe: list[CPE] | None + ignore: list[str] | None + + +class MonitoringYAML: + """Represents a Monitoring YAML file.""" + + def __init__(self, stream: Any): + yaml = YAML(typ="safe", pure=True) + yaml.default_flow_style = False + self._data = dict(yaml.load(stream)) + + @property + def releases(self) -> Releases | None: + return self._data.get("releases") + + @property + def release_id(self) -> int | None: + releases = self.releases + if releases: + return releases.get("id") + return None + + @property + def release_ignore(self) -> list[str] | None: + releases = self.releases + if releases and releases.get("ignore"): + return releases.get("ignore") + return None + + @property + def security(self) -> Security | None: + return self._data.get("security") + + @property + def cpe(self) -> list[CPE] | None: + security = self.security + if security: + return security.get("cpe") + return None + + @property + def security_ignore(self) -> list[str] | None: + security = self.security + if security and security.get("ignore"): + return security.get("ignore") + return None + + def get(self, key: str, default: Any = None) -> Any: + return self._data.get(key, default) + + @dataclass class FreezeConfig: start: datetime | None @@ -284,6 +349,7 @@ def _record(self) -> logging.LogRecord: class PullRequestCheck: + _monitoring_files: list[str] = ["monitoring.yaml"] _package_files: list[str] = ["package.yml"] _pspec_files: list[str] = ["pspec_x86_64.xml"] _two_letter_dirs: list[str] = ["py"] @@ -308,6 +374,10 @@ def config(self) -> Config: return self._config + @property + def monitoring_files(self) -> list[str]: + return self.filter_files(*self._monitoring_files) + @property def package_files(self) -> list[str]: return self.filter_files(*self._package_files) @@ -332,6 +402,13 @@ def _read(self, path: str) -> str: def _exists(self, path: str) -> bool: return os.path.exists(self._path(path)) + def load_monitoring_yml(self, file: str) -> MonitoringYAML: + with self._open(file) as f: + return MonitoringYAML(f) + + def load_monitoring_yml_from_commit(self, ref: str, file: str) -> MonitoringYAML: + return MonitoringYAML(self.git.file_from_commit(ref, file)) + def load_package_yml(self, file: str) -> PackageYML: with self._open(file) as f: return PackageYML(f) @@ -495,6 +572,223 @@ def _has_monitoring_yml(self, file: str) -> bool: return self._exists(os.path.join(os.path.dirname(file), "monitoring.yaml")) +class MonitoringFormat(PullRequestCheck): + _error_required_sections: str = ( + "monitoring.yaml must contain required sections: releases and security" + ) + _error_cpe_format: str = "security.cpe must be a list or null (~)" + _error_cpe_entry: str = "Each CPE entry must have both 'vendor' and 'product' fields with non-null values" + _level: Level = Level.ERROR + + def _yml_file(self, file: str) -> MonitoringYAML: + return self.load_monitoring_yml(file) + + def _is_valid_url(self, url: str | None) -> bool: + result = urlparse(url) + return bool(result.scheme and result.netloc) + + @override + def run(self) -> list[Result]: + package_files = self.monitoring_files + results: list[Result] = [] + + for file in package_files: + monitoring = self._yml_file(file) + + # Check required sections + results.extend(self._check_required_sections(file, monitoring)) + + # Check security section + results.extend(self._check_security_section(file, monitoring)) + + # Check releases section + results.extend(self._check_releases_section(file, monitoring)) + + return results + + def _check_required_sections( + self, file: str, monitoring: MonitoringYAML + ) -> list[Result]: + results: list[Result] = [] + if not isinstance(monitoring.releases, dict) and not isinstance( + monitoring.security, dict + ): + results.append( + Result( + message=self._error_required_sections, file=file, level=self._level + ) + ) + return results + + def _check_security_section( + self, file: str, monitoring: MonitoringYAML + ) -> list[Result]: + results: list[Result] = [] + + # Ensure the 'cpe' key exists in the security section + if not isinstance(monitoring.cpe, list) and monitoring.cpe is not None: + results.append( + Result( + message=self._error_cpe_format, + file=file, + level=self._level, + line=self.file_line(file, r"^security\s*:"), + ) + ) + + if monitoring.cpe: + results.extend(self._check_cpe_entries(file, monitoring.cpe)) + results.extend( + self._check_security_ignore_patterns(file, monitoring.security_ignore) + ) + + return results + + def _check_security_ignore_patterns( + self, file: str, ignore_patterns: list[str] | None + ) -> list[Result]: + results: list[Result] = [] + if not ignore_patterns: + return results + + if not all(isinstance(pattern, str) for pattern in ignore_patterns): + results.append( + Result( + message="security.ignore must contain string patterns", + file=file, + level=self._level, + line=self.file_line(file, r"^ ignore\s*:"), + ) + ) + else: + # Check that all patterns begin with CVE- + invalid_patterns = [ + pattern for pattern in ignore_patterns if not pattern.startswith("CVE-") + ] + if invalid_patterns: + results.append( + Result( + message=f"security.ignore patterns must begin with 'CVE-': {', '.join(invalid_patterns)}", + file=file, + level=self._level, + line=self.file_line(file, r"^ security\.ignore\s*:"), + ) + ) + return results + + def _check_cpe_entries(self, file: str, cpe_entries: list[CPE]) -> list[Result]: + results: list[Result] = [] + for item in cpe_entries: + if item.get("vendor") is None or item.get("product") is None: + results.append( + Result( + message=self._error_cpe_entry, + file=file, + level=self._level, + line=self.file_line(file, r"^ cpe\s*:"), + ) + ) + return results + + def _check_releases_section( + self, file: str, monitoring: MonitoringYAML + ) -> list[Result]: + results: list[Result] = [] + + if monitoring.releases is not None: + results.extend(self._check_releases_id(file, monitoring.releases)) + results.extend(self._check_releases_rss(file, monitoring.releases)) + results.extend( + self._check_releases_ignore_patterns(file, monitoring.release_ignore) + ) + return results + + def _check_releases_ignore_patterns( + self, file: str, ignore_patterns: list[str] | None + ) -> list[Result]: + results: list[Result] = [] + if ignore_patterns and not all( + isinstance(pattern, str) for pattern in ignore_patterns + ): + results.append( + Result( + message="releases.ignore must contain string patterns", + file=file, + level=self._level, + line=self.file_line(file, r"^ ignore\s*:"), + ) + ) + return results + + def _check_releases_rss(self, file: str, releases: Releases) -> list[Result]: + results: list[Result] = [] + if "rss" not in releases: + results.append( + Result( + message="releases section must contain an `rss` key", + file=file, + level=self._level, + line=self.file_line(file, r"^releases\s*:"), + ) + ) + elif releases.get("rss") is None: + # The key exists but has a null value + results.append( + Result( + message="releases.rss is set to null, it should point to a rss feed", + file=file, + level=Level.WARNING, + line=self.file_line(file, r"^\s+rss\s*:"), + ) + ) + elif releases.get("rss") is not None and not self._is_valid_url( + releases.get("rss") + ): + results.append( + Result( + message="releases.rss must contain a valid URL", + file=file, + level=self._level, + line=self.file_line(file, r"^\s+rss\s*:"), + ) + ) + return results + + def _check_releases_id(self, file: str, releases: Releases) -> list[Result]: + results: list[Result] = [] + if "id" not in releases: + results.append( + Result( + message="releases section must contain an `id` key", + file=file, + level=self._level, + line=self.file_line(file, r"^releases\s*:"), + ) + ) + elif releases.get("id") is None: + # The key exists but has a null value + results.append( + Result( + message="releases.id is set to null, it should have a numeric value", + file=file, + level=Level.WARNING, + line=self.file_line(file, r"^\s+id\s*:"), + ) + ) + elif releases.get("id") is not None: + id_value = releases["id"] + if id_value is not None and not isinstance(id_value, int): + results.append( + Result( + message="releases.id must be a number", + file=file, + level=self._level, + line=self.file_line(file, r"^\s+id\s*:"), + ) + ) + return results + + class License(PullRequestCheck): _error: str = "Package is missing license files" _level: Level = Level.WARNING @@ -1012,6 +1306,7 @@ class Checker: FrozenPackage, Homepage, Monitoring, + MonitoringFormat, License, PackageBumped, PackageDependenciesOrder,