Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
309 changes: 302 additions & 7 deletions common/CI/package_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,9 @@
from dataclasses import dataclass
from datetime import datetime, timezone
from enum import Enum
from typing import (
Any,
Callable,
TextIO,
final,
override,
)
from typing import Any, Callable, NotRequired, TextIO, TypedDict, final, override
from urllib import request
from urllib.parse import urlparse
from xml.etree import ElementTree

from ruamel.yaml import YAML
Expand Down Expand Up @@ -86,6 +81,76 @@ def files(self) -> list[str]:
return [str(element.text) for element in self._xml.findall(".//Path")]


class Releases(TypedDict):
"""Represents the Releases section of a Monitoring YAML file."""

id: NotRequired[int | None] # Note: actually required
rss: NotRequired[str | None] # Note: actually required
ignore: NotRequired[list[str] | None]


class CPE(TypedDict):
"""Represents the CPE section of a Monitoring YAML file."""

vendor: str | None
product: str | None


class Security(TypedDict):
"""Represents the Security section of a Monitoring YAML file."""

cpe: list[CPE] | None
ignore: list[str] | None


class MonitoringYAML:
"""Represents a Monitoring YAML file."""

def __init__(self, stream: Any):
yaml = YAML(typ="safe", pure=True)
yaml.default_flow_style = False
self._data = dict(yaml.load(stream))

@property
def releases(self) -> Releases | None:
return self._data.get("releases")

@property
def release_id(self) -> int | None:
releases = self.releases
if releases:
return releases.get("id")
return None

@property
def release_ignore(self) -> list[str] | None:
releases = self.releases
if releases and releases.get("ignore"):
return releases.get("ignore")
return None

@property
def security(self) -> Security | None:
return self._data.get("security")

@property
def cpe(self) -> list[CPE] | None:
security = self.security
if security:
return security.get("cpe")
return None

@property
def security_ignore(self) -> list[str] | None:
security = self.security
if security and security.get("ignore"):
return security.get("ignore")
return None

def get(self, key: str, default: Any = None) -> Any:
return self._data.get(key, default)


@dataclass
class FreezeConfig:
start: datetime | None
Expand Down Expand Up @@ -284,6 +349,7 @@ def _record(self) -> logging.LogRecord:


class PullRequestCheck:
_monitoring_files: list[str] = ["monitoring.yaml"]
_package_files: list[str] = ["package.yml"]
_pspec_files: list[str] = ["pspec_x86_64.xml"]
_two_letter_dirs: list[str] = ["py"]
Expand All @@ -308,6 +374,10 @@ def config(self) -> Config:

return self._config

@property
def monitoring_files(self) -> list[str]:
return self.filter_files(*self._monitoring_files)

@property
def package_files(self) -> list[str]:
return self.filter_files(*self._package_files)
Expand All @@ -332,6 +402,13 @@ def _read(self, path: str) -> str:
def _exists(self, path: str) -> bool:
return os.path.exists(self._path(path))

def load_monitoring_yml(self, file: str) -> MonitoringYAML:
with self._open(file) as f:
return MonitoringYAML(f)

def load_monitoring_yml_from_commit(self, ref: str, file: str) -> MonitoringYAML:
return MonitoringYAML(self.git.file_from_commit(ref, file))

def load_package_yml(self, file: str) -> PackageYML:
with self._open(file) as f:
return PackageYML(f)
Expand Down Expand Up @@ -495,6 +572,223 @@ def _has_monitoring_yml(self, file: str) -> bool:
return self._exists(os.path.join(os.path.dirname(file), "monitoring.yaml"))


class MonitoringFormat(PullRequestCheck):
_error_required_sections: str = (
"monitoring.yaml must contain required sections: releases and security"
)
_error_cpe_format: str = "security.cpe must be a list or null (~)"
_error_cpe_entry: str = "Each CPE entry must have both 'vendor' and 'product' fields with non-null values"
_level: Level = Level.ERROR

def _yml_file(self, file: str) -> MonitoringYAML:
return self.load_monitoring_yml(file)

def _is_valid_url(self, url: str | None) -> bool:
result = urlparse(url)
return bool(result.scheme and result.netloc)

@override
def run(self) -> list[Result]:
package_files = self.monitoring_files
results: list[Result] = []

for file in package_files:
monitoring = self._yml_file(file)

# Check required sections
results.extend(self._check_required_sections(file, monitoring))

# Check security section
results.extend(self._check_security_section(file, monitoring))

# Check releases section
results.extend(self._check_releases_section(file, monitoring))

return results

def _check_required_sections(
self, file: str, monitoring: MonitoringYAML
) -> list[Result]:
results: list[Result] = []
if not isinstance(monitoring.releases, dict) and not isinstance(
monitoring.security, dict
):
results.append(
Result(
message=self._error_required_sections, file=file, level=self._level
)
)
return results

def _check_security_section(
self, file: str, monitoring: MonitoringYAML
) -> list[Result]:
results: list[Result] = []

# Ensure the 'cpe' key exists in the security section
if not isinstance(monitoring.cpe, list) and monitoring.cpe is not None:
results.append(
Result(
message=self._error_cpe_format,
file=file,
level=self._level,
line=self.file_line(file, r"^security\s*:"),
)
)

if monitoring.cpe:
results.extend(self._check_cpe_entries(file, monitoring.cpe))
results.extend(
self._check_security_ignore_patterns(file, monitoring.security_ignore)
)

return results

def _check_security_ignore_patterns(
self, file: str, ignore_patterns: list[str] | None
) -> list[Result]:
results: list[Result] = []
if not ignore_patterns:
return results

if not all(isinstance(pattern, str) for pattern in ignore_patterns):
results.append(
Result(
message="security.ignore must contain string patterns",
file=file,
level=self._level,
line=self.file_line(file, r"^ ignore\s*:"),
)
)
else:
# Check that all patterns begin with CVE-
invalid_patterns = [
pattern for pattern in ignore_patterns if not pattern.startswith("CVE-")
]
if invalid_patterns:
results.append(
Result(
message=f"security.ignore patterns must begin with 'CVE-': {', '.join(invalid_patterns)}",
file=file,
level=self._level,
line=self.file_line(file, r"^ security\.ignore\s*:"),
)
)
return results

def _check_cpe_entries(self, file: str, cpe_entries: list[CPE]) -> list[Result]:
results: list[Result] = []
for item in cpe_entries:
if item.get("vendor") is None or item.get("product") is None:
results.append(
Result(
message=self._error_cpe_entry,
file=file,
level=self._level,
line=self.file_line(file, r"^ cpe\s*:"),
)
)
return results

def _check_releases_section(
self, file: str, monitoring: MonitoringYAML
) -> list[Result]:
results: list[Result] = []

if monitoring.releases is not None:
results.extend(self._check_releases_id(file, monitoring.releases))
results.extend(self._check_releases_rss(file, monitoring.releases))
results.extend(
self._check_releases_ignore_patterns(file, monitoring.release_ignore)
)
return results

def _check_releases_ignore_patterns(
self, file: str, ignore_patterns: list[str] | None
) -> list[Result]:
results: list[Result] = []
if ignore_patterns and not all(
isinstance(pattern, str) for pattern in ignore_patterns
):
results.append(
Result(
message="releases.ignore must contain string patterns",
file=file,
level=self._level,
line=self.file_line(file, r"^ ignore\s*:"),
)
)
return results

def _check_releases_rss(self, file: str, releases: Releases) -> list[Result]:
results: list[Result] = []
if "rss" not in releases:
results.append(
Result(
message="releases section must contain an `rss` key",
file=file,
level=self._level,
line=self.file_line(file, r"^releases\s*:"),
)
)
elif releases.get("rss") is None:
# The key exists but has a null value
results.append(
Result(
message="releases.rss is set to null, it should point to a rss feed",
file=file,
level=Level.WARNING,
line=self.file_line(file, r"^\s+rss\s*:"),
)
)
elif releases.get("rss") is not None and not self._is_valid_url(
releases.get("rss")
):
results.append(
Result(
message="releases.rss must contain a valid URL",
file=file,
level=self._level,
line=self.file_line(file, r"^\s+rss\s*:"),
)
)
return results

def _check_releases_id(self, file: str, releases: Releases) -> list[Result]:
results: list[Result] = []
if "id" not in releases:
results.append(
Result(
message="releases section must contain an `id` key",
file=file,
level=self._level,
line=self.file_line(file, r"^releases\s*:"),
)
)
elif releases.get("id") is None:
# The key exists but has a null value
results.append(
Result(
message="releases.id is set to null, it should have a numeric value",
file=file,
level=Level.WARNING,
line=self.file_line(file, r"^\s+id\s*:"),
)
)
elif releases.get("id") is not None:
id_value = releases["id"]
if id_value is not None and not isinstance(id_value, int):
results.append(
Result(
message="releases.id must be a number",
file=file,
level=self._level,
line=self.file_line(file, r"^\s+id\s*:"),
)
)
return results


class License(PullRequestCheck):
_error: str = "Package is missing license files"
_level: Level = Level.WARNING
Expand Down Expand Up @@ -1012,6 +1306,7 @@ class Checker:
FrozenPackage,
Homepage,
Monitoring,
MonitoringFormat,
License,
PackageBumped,
PackageDependenciesOrder,
Expand Down
Loading