diff --git a/src/access/profiling/cylc_manager.py b/src/access/profiling/cylc_manager.py index 9247ec1..69eb979 100644 --- a/src/access/profiling/cylc_manager.py +++ b/src/access/profiling/cylc_manager.py @@ -1,12 +1,16 @@ # Copyright 2025 ACCESS-NRI and contributors. See the top-level COPYRIGHT file for details. # SPDX-License-Identifier: Apache-2.0 +import getpass import logging +import os +import shutil +import subprocess from abc import ABC, abstractmethod from pathlib import Path from access.profiling.cylc_parser import CylcDBReader, CylcProfilingParser -from access.profiling.experiment import ProfilingLog +from access.profiling.experiment import ProfilingExperiment, ProfilingExperimentStatus, ProfilingLog from access.profiling.manager import ProfilingManager from access.profiling.parser import ProfilingParser @@ -38,36 +42,169 @@ def known_parsers(self) -> dict[str, ProfilingParser]: """ def parse_ncpus(self, path: Path, run_path: Path | None = None) -> int: - # this is a symlink - config_path = path / "log/rose-suite-run.conf" + # both the run and original config will store cpu information + config_paths = [] + if run_path is not None: + config_paths.append(run_path / "log/rose-suite-run.conf") + config_paths.append(path / "rose-suite.conf") - if not config_path.is_file(): - raise FileNotFoundError(f"Could not find suitable config file in {config_path}") + config_path = next((candidate for candidate in config_paths if candidate.is_file()), None) + if config_path is None: + tried = ", ".join(str(p) for p in config_paths) + raise FileNotFoundError(f"Could not find suitable config file. Tried: {tried}") for line in config_path.read_text().splitlines(): - if not line.startswith("!!"): - keypair = line.split("=") - if keypair[0].strip() == self._layout_variable: - layout = keypair[1].split(",") + if not line.startswith("!!") and "=" in line: + key, value = line.split("=", 1) + if key.strip() == self._layout_variable: + layout = value.split(",") return int(layout[0].strip()) * int(layout[1].strip()) raise ValueError(f"Cannot find layout key, {self._layout_variable}, in {config_path}.") + def add_rose_experiment(self, rose: str, project: str | None = None) -> None: + """Adds the given rose as an experiment to this manager. + + Args: + rose (str): The rose to add as an experiment. + project (str): The project to use to look for the results. If no project is provided, + the project in the PROJECT environment variable will be used. + + Raises: + ValueError: If no project is specified and the PROJECT environment variable is not set, + or if the experiment path does not exist. + """ + project = project or os.environ.get("PROJECT") + if project is None: + raise ValueError("No project specified and PROJECT environment variable is not set.") + + experiment_path = self.work_dir / rose + if not experiment_path.is_dir(): + raise ValueError(f"Experiment path '{experiment_path}' does not exist or is not a directory.") + + run_path = Path("/scratch") / project / getpass.getuser() / "cylc-run" / rose + if not run_path.is_dir(): + logger.warning(f"Run path '{run_path}' does not exist. Archiving will only include experiment files.") + run_path = None + + self.experiments[rose] = ProfilingExperiment(path=experiment_path, run_path=run_path) + self.experiments[rose].status = ProfilingExperimentStatus.DONE + + def run_experiments(self) -> None: + """Runs Rose Cylc experiments via `rose suite-run` for profiling data generation.""" + + to_run = {name: exp for name, exp in self.experiments.items() if exp.status == ProfilingExperimentStatus.NEW} + + if not to_run: + logger.info("No new experiments to run. Will skip execution.") + return + + for name, exp in to_run.items(): + logger.info(f"Running experiment '{name}' via rose suite-run in '{exp.path}'.") + try: + result = subprocess.run(["rose", "suite-run"], cwd=exp.path, check=True, capture_output=True, text=True) + except subprocess.CalledProcessError as e: + for line in e.stdout.splitlines(): + logger.info(f"[{name}] {line}") + for line in e.stderr.splitlines(): + logger.error(f"[{name}] {line}") + raise + for line in result.stdout.splitlines(): + logger.info(f"[{name}] {line}") + for line in result.stderr.splitlines(): + logger.warning(f"[{name}] {line}") + exp.status = ProfilingExperimentStatus.RUNNING + + # TODO: properly detect when running experiments have completed rather than marking them done immediately. + for exp in self.experiments.values(): + if exp.status == ProfilingExperimentStatus.RUNNING: + exp.status = ProfilingExperimentStatus.DONE + + def delete_experiments( + self, + experiments: list[str] | None = None, + all_experiments: bool = False, + dry_run: bool = False, + ) -> None: + """Deletes Rose Cylc experiments from the work directory and removes them from the manager. + + Args: + experiments (list[str] | None): List of experiment names to delete. + all_experiments (bool): If True, deletes all experiments managed by this instance. + dry_run (bool): If True, logs what would be deleted without making any changes. Defaults to False. + + Raises: + ValueError: If both experiments and all_experiments are specified, or neither is. + KeyError: If any experiment name is not managed by this instance. + """ + names_to_delete = self._resolve_names(experiments, all_experiments) + + for name in names_to_delete: + exp = self.experiments[name] + exp_path = exp.path + run_path = exp.run_path + if dry_run: + logger.info(f"Dry run: would delete experiment directory '{exp_path}' and run directory '{run_path}'.") + continue + if exp_path.is_dir(): + logger.info(f"Deleting experiment directory '{exp_path}'.") + shutil.rmtree(exp_path) + else: + logger.warning(f"Experiment directory '{exp_path}' does not exist. Skipping deletion.") + if run_path is not None: + if run_path.is_dir(): + logger.info(f"Deleting run directory '{run_path}'.") + shutil.rmtree(run_path) + else: + logger.warning(f"Run directory '{run_path}' does not exist. Skipping deletion.") + del self.experiments[name] + + def archive_experiments( + self, + exclude_dirs: list[str] | None = None, + exclude_files: list[str] | None = None, + follow_symlinks: bool = False, + overwrite: bool = False, + ) -> None: + """Archives completed experiments to the specified archive path. + + Args: + exclude_dirs (list[str] | None): Directory patterns to exclude when archiving. Defaults to + [".svn", "share"] if not provided. + exclude_files (list[str] | None): File patterns to exclude when archiving. Defaults to + ["*.nc"] if not provided. + follow_symlinks (bool): Whether to follow symlinks when archiving. Defaults to False. + overwrite (bool): Whether to overwrite existing archives. Defaults to False. + """ + if exclude_dirs is None: + exclude_dirs = [".svn", "share"] + if exclude_files is None: + exclude_files = ["*.nc"] + super().archive_experiments( + exclude_dirs=exclude_dirs, + exclude_files=exclude_files, + follow_symlinks=follow_symlinks, + overwrite=overwrite, + ) + def profiling_logs(self, path: Path, run_path: Path | None = None) -> dict[str, ProfilingLog]: """Returns all profiling logs from the specified path. Args: path (Path): Path to the experiment directory. - run_path (Path | None): Optional path to a separate runs directory. + run_path (Path | None): Path to the Cylc run directory. Returns: dict[str, ProfilingLog]: Dictionary of profiling logs. """ + if run_path is None: + raise ValueError("Cylc run_path is required to locate profiling logs.") + logs = {} # setup log paths - suite_log = path / "log/suite/log" # cylc log file - cylcdb = path / "cylc-suite.db" # database with task runtimes - jobdir = path / "log/job" # where task logs are stored + suite_log = run_path / "log/suite/log" # cylc log file + cylcdb = run_path / "cylc-suite.db" # database with task runtimes + jobdir = run_path / "log/job" # where task logs are stored logs["cylc_suite_log"] = ProfilingLog(suite_log, CylcProfilingParser()) # cylcdb.read_text = lambda x: x # hack to make log work diff --git a/src/access/profiling/manager.py b/src/access/profiling/manager.py index e87328b..d82e443 100644 --- a/src/access/profiling/manager.py +++ b/src/access/profiling/manager.py @@ -158,6 +158,34 @@ def delete_experiment(self, name: str) -> None: else: logger.warning(f"Experiment '{name}' not found; cannot delete.") + def _resolve_names(self, names: list[str] | None, all_experiments: bool) -> set[str]: + """Validates and resolves a selection of experiment names to operate on. + + Args: + names: Explicit list of experiment names, or None when all_experiments is True. + all_experiments: If True, selects all managed experiments. + + Returns: + Set of experiment names to operate on. + + Raises: + ValueError: If both names and all_experiments are provided, or neither is. + KeyError: If any specified name is not managed by this instance. + """ + if all_experiments and names is not None: + raise ValueError("Pass either names=[...] or all_experiments=True, not both.") + if not all_experiments and not names: + raise ValueError("No experiments specified. Pass either names=[...] or all_experiments=True.") + existing = set(self.experiments.keys()) + to_process = existing if all_experiments else set(names) + unmanaged = [e for e in to_process if e not in existing] + if unmanaged: + raise KeyError( + f"Experiments {unmanaged} are not managed by this manager " + f"(existing: {existing}). Please check the names and try again." + ) + return to_process + def parse_profiling_data(self): """Parses profiling data from the experiments.""" self.data = {} diff --git a/src/access/profiling/payu_manager.py b/src/access/profiling/payu_manager.py index 95b7be4..097bf05 100644 --- a/src/access/profiling/payu_manager.py +++ b/src/access/profiling/payu_manager.py @@ -227,21 +227,8 @@ def delete_experiments( dry_run (bool): If True, performs a dry run without deleting files. Defaults to False. remove_repo_dir (bool): If True, removes the base repository directory if no branches are using it. """ - if all_branches and branches is not None: - raise ValueError("Pass either branches=[...] or all_branches=True") + branches_to_delete = self._resolve_names(branches, all_branches) - if not all_branches and not branches: - raise ValueError("No branches specified for delete! Pass either branches=[...] or all_branches=True") - - existing_branches = set(self.experiments.keys()) - branches_to_delete = existing_branches if all_branches else set(branches) - - unmanaged_branches = [i for i in branches_to_delete if i not in existing_branches] - if unmanaged_branches: - raise KeyError( - f"Branches {unmanaged_branches} are not managed by the PayuManager" - f" (existing branches: {existing_branches}). Please check the branch names and try again." - ) if not branches_to_delete: logger.info("No branches to delete after checking against existing branches. Will skip delete.") return diff --git a/tests/test_cylc_manager.py b/tests/test_cylc_manager.py index 122b7a7..90f4b9b 100644 --- a/tests/test_cylc_manager.py +++ b/tests/test_cylc_manager.py @@ -1,6 +1,8 @@ # Copyright 2025 ACCESS-NRI and contributors. See the top-level COPYRIGHT file for details. # SPDX-License-Identifier: Apache-2.0 +import logging +import subprocess from pathlib import Path from unittest import mock @@ -8,6 +10,8 @@ from access.profiling.cylc_manager import CylcRoseManager from access.profiling.cylc_parser import CylcDBReader, CylcProfilingParser +from access.profiling.experiment import ProfilingExperiment, ProfilingExperimentStatus +from access.profiling.manager import ProfilingManager from access.profiling.parser import ProfilingParser @@ -28,17 +32,19 @@ def manager(): def test_parse_profiling_logs(mock_path_glob, manager): """Test the parse_profiling_logs method of CylcRoseManager with missing directories.""" + run_path = Path("/fake/run_path") + # no component log files mock_path_glob.return_value = [] with pytest.raises(RuntimeError): - manager.profiling_logs(Path("/fake/path")) + manager.profiling_logs(Path("/fake/path"), run_path) mock_path_glob.assert_called_once() # component log files are present mock_path_glob.reset_mock() - mock_path_glob.return_value = [Path("/fake/path/cycle1/task1/NN/job.out")] + mock_path_glob.return_value = [Path("/fake/run_path/cycle1/task1/NN/job.out")] # return something "valid" for the cylc loc and db, but fail to read the component log. - logs = manager.profiling_logs(Path("/fake/path")) + logs = manager.profiling_logs(Path("/fake/path"), run_path) mock_path_glob.assert_called_once() assert "cylc_suite_log" in logs assert isinstance(logs["cylc_suite_log"].parser, CylcProfilingParser) @@ -48,6 +54,29 @@ def test_parse_profiling_logs(mock_path_glob, manager): assert isinstance(logs["task1_cyclecycle1_fake-parser"].parser, mock.MagicMock) +def test_profiling_logs_requires_run_path(manager): + """Cylc profiling logs live in the run directory, so run_path is required.""" + + with pytest.raises(ValueError, match="Cylc run_path is required"): + manager.profiling_logs(Path("/fake/path")) + + +def test_profiling_logs_uses_run_path(tmp_path, manager): + """Cylc runtime logs should be resolved from run_path when it is provided.""" + + exp_path = tmp_path / "experiment" + run_path = tmp_path / "runs" + job_out = run_path / "log/job/cycle1/task1/NN/job.out" + job_out.parent.mkdir(parents=True) + job_out.touch() + + logs = manager.profiling_logs(exp_path, run_path) + + assert logs["cylc_suite_log"].filepath == run_path / "log/suite/log" + assert logs["cylc_tasks"].filepath == run_path / "cylc-suite.db" + assert logs["task1_cyclecycle1_fake-parser"].filepath == job_out + + @mock.patch("access.profiling.access_models.Path.is_file") @mock.patch("access.profiling.access_models.Path.read_text") def test_parse_ncpus(mock_read_text, mock_is_file, manager): @@ -67,3 +96,242 @@ def test_parse_ncpus(mock_read_text, mock_is_file, manager): # mock presence of layout variable mock_read_text.return_value += "\n um_layout = 2,3" manager.parse_ncpus(Path("/fake/path")) + + +def test_parse_ncpus_uses_run_path(tmp_path, manager): + """The separate Cylc run directory is the source of truth when both configs exist.""" + + exp_path = tmp_path / "experiment" + run_path = tmp_path / "runs" + exp_path.mkdir() + (run_path / "log").mkdir(parents=True) + (exp_path / "rose-suite.conf").write_text("um_layout = 9,9\n") + (run_path / "log/rose-suite-run.conf").write_text("um_layout = 2,3\n") + + assert manager.parse_ncpus(exp_path, run_path) == 6 + + +@mock.patch("access.profiling.cylc_manager.getpass.getuser", return_value="fake-user") +def test_add_rose_experiment_uses_new_experiment_api(mock_getuser, manager): + """add_rose_experiment should populate ProfilingExperiment.path and run_path.""" + + rose = "u-aa123" + experiment_path = manager.work_dir / rose + run_path = Path("/scratch") / "proj" / "fake-user" / "cylc-run" / rose + + with mock.patch("access.profiling.cylc_manager.Path.is_dir", autospec=True) as mock_is_dir: + mock_is_dir.side_effect = lambda path: path in {experiment_path, run_path} + manager.add_rose_experiment(rose, project="proj") + + assert manager.experiments[rose].path == experiment_path + assert manager.experiments[rose].run_path == run_path + assert manager.experiments[rose].status == ProfilingExperimentStatus.DONE + mock_getuser.assert_called_once() + + +def test_add_rose_experiment_requires_project(monkeypatch, manager): + """A project must be provided explicitly or through the PROJECT environment variable.""" + + monkeypatch.delenv("PROJECT", raising=False) + + with pytest.raises(ValueError, match="No project specified and PROJECT environment variable is not set"): + manager.add_rose_experiment("u-aa123") + + +def test_add_rose_experiment_rejects_missing_experiment_path(manager): + """The Rose experiment directory must exist before it can be managed.""" + + with ( + mock.patch("access.profiling.cylc_manager.Path.is_dir", return_value=False), + pytest.raises(ValueError, match="does not exist or is not a directory"), + ): + manager.add_rose_experiment("u-aa123", project="proj") + + +@mock.patch("access.profiling.cylc_manager.getpass.getuser", return_value="fake-user") +def test_add_rose_experiment_missing_run_path(mock_getuser, caplog, manager): + """Missing Cylc run directories are allowed, but stored as None.""" + + rose = "u-aa123" + experiment_path = manager.work_dir / rose + run_path = Path("/scratch") / "proj" / "fake-user" / "cylc-run" / rose + + with mock.patch("access.profiling.cylc_manager.Path.is_dir", autospec=True) as mock_is_dir: + mock_is_dir.side_effect = lambda path: path == experiment_path + with caplog.at_level(logging.WARNING): + manager.add_rose_experiment(rose, project="proj") + + assert manager.experiments[rose].path == experiment_path + assert manager.experiments[rose].run_path is None + assert f"Run path '{run_path}' does not exist" in caplog.text + mock_getuser.assert_called_once() + + +def test_delete_experiments_removes_path_and_run_path(tmp_path): + """delete_experiments should use ProfilingExperiment.path and run_path.""" + + manager = MockCylcManager(tmp_path / "work", tmp_path / "archive", layout_variable="um_layout") + exp_path = tmp_path / "work/u-aa123" + run_path = tmp_path / "runs/u-aa123" + exp_path.mkdir(parents=True) + run_path.mkdir(parents=True) + manager.experiments["u-aa123"] = ProfilingExperiment(path=exp_path, run_path=run_path) + + manager.delete_experiments(experiments=["u-aa123"]) + + assert "u-aa123" not in manager.experiments + assert not exp_path.exists() + assert not run_path.exists() + + +def test_delete_experiments_rejects_invalid_selection(manager): + """delete_experiments requires exactly one selection mode.""" + + with pytest.raises(ValueError, match="Pass either names=\\[\\.\\.\\.\\] or all_experiments=True"): + manager.delete_experiments(experiments=["u-aa123"], all_experiments=True) + + with pytest.raises(ValueError, match="No experiments specified"): + manager.delete_experiments() + + +def test_delete_experiments_rejects_unmanaged_experiment(manager): + """Only experiments tracked by this manager can be deleted.""" + + with pytest.raises(KeyError, match="not managed by this manager"): + manager.delete_experiments(experiments=["u-aa123"]) + + +def test_delete_experiments_warns_for_missing_directories(tmp_path, caplog): + """Missing experiment and run directories should warn but still remove manager state.""" + + manager = MockCylcManager(tmp_path / "work", tmp_path / "archive", layout_variable="um_layout") + exp_path = tmp_path / "work/u-aa123" + run_path = tmp_path / "runs/u-aa123" + manager.experiments["u-aa123"] = ProfilingExperiment(path=exp_path, run_path=run_path) + + with caplog.at_level(logging.WARNING): + manager.delete_experiments(experiments=["u-aa123"]) + + assert "u-aa123" not in manager.experiments + assert f"Experiment directory '{exp_path}' does not exist. Skipping deletion." in caplog.text + assert f"Run directory '{run_path}' does not exist. Skipping deletion." in caplog.text + + +def test_delete_experiments_dry_run_keeps_directories_and_manager_state(tmp_path, caplog): + """Dry runs should report actions without deleting directories or manager entries.""" + + manager = MockCylcManager(tmp_path / "work", tmp_path / "archive", layout_variable="um_layout") + exp_path = tmp_path / "work/u-aa123" + run_path = tmp_path / "runs/u-aa123" + exp_path.mkdir(parents=True) + run_path.mkdir(parents=True) + manager.experiments["u-aa123"] = ProfilingExperiment(path=exp_path, run_path=run_path) + + with caplog.at_level(logging.INFO): + manager.delete_experiments(experiments=["u-aa123"], dry_run=True) + + assert "u-aa123" in manager.experiments + assert exp_path.is_dir() + assert run_path.is_dir() + assert f"Dry run: would delete experiment directory '{exp_path}' and run directory '{run_path}'." in caplog.text + + +@mock.patch("access.profiling.cylc_manager.subprocess.run") +def test_run_experiments_skips_when_no_new_experiments(mock_subprocess, caplog, manager): + """run_experiments should do nothing when there are no NEW experiments.""" + + manager.experiments["u-aa123"] = ProfilingExperiment(path=Path("/fake/path")) + manager.experiments["u-aa123"].status = ProfilingExperimentStatus.DONE + + with caplog.at_level(logging.INFO): + manager.run_experiments() + + mock_subprocess.assert_not_called() + assert "No new experiments to run" in caplog.text + + +@mock.patch("access.profiling.cylc_manager.subprocess.run") +def test_run_experiments_calls_rose_suite_run(mock_subprocess, manager): + """run_experiments should call rose suite-run in each NEW experiment's path.""" + + exp_path = Path("/fake/u-aa123") + manager.experiments["u-aa123"] = ProfilingExperiment(path=exp_path) + manager.experiments["u-aa123"].status = ProfilingExperimentStatus.NEW + + manager.run_experiments() + + mock_subprocess.assert_called_once_with( + ["rose", "suite-run"], cwd=exp_path, check=True, capture_output=True, text=True + ) + + +@mock.patch("access.profiling.cylc_manager.subprocess.run") +def test_run_experiments_only_runs_new_experiments(mock_subprocess, manager): + """run_experiments should only submit experiments with NEW status.""" + + manager.experiments["new"] = ProfilingExperiment(path=Path("/fake/new")) + manager.experiments["new"].status = ProfilingExperimentStatus.NEW + manager.experiments["done"] = ProfilingExperiment(path=Path("/fake/done")) + manager.experiments["done"].status = ProfilingExperimentStatus.DONE + + manager.run_experiments() + + assert mock_subprocess.call_count == 1 + mock_subprocess.assert_called_once_with( + ["rose", "suite-run"], cwd=Path("/fake/new"), check=True, capture_output=True, text=True + ) + + +@mock.patch("access.profiling.cylc_manager.subprocess.run") +def test_run_experiments_forwards_output_with_prefix(mock_subprocess, caplog, manager): + """stdout and stderr lines should be logged with the experiment name as prefix.""" + + stdout = ( + "[INFO] export CYLC_VERSION=7.9.9\n" + "[INFO] export ROSE_ORIG_HOST=gadi-login-03.gadi.nci.org.au\n" + "[INFO] export ROSE_SITE=nci\n" + "[INFO] export ROSE_VERSION=2019.01.8\n" + ) + mock_subprocess.return_value = mock.MagicMock(stdout=stdout, stderr="warning: low disk\n") + manager.experiments["u-aa123"] = ProfilingExperiment(path=Path("/fake/path")) + manager.experiments["u-aa123"].status = ProfilingExperimentStatus.NEW + + with caplog.at_level(logging.INFO): + manager.run_experiments() + + assert "[u-aa123] [INFO] export CYLC_VERSION=7.9.9" in caplog.text + assert "[u-aa123] [INFO] export ROSE_ORIG_HOST=gadi-login-03.gadi.nci.org.au" in caplog.text + assert "[u-aa123] [INFO] export ROSE_SITE=nci" in caplog.text + assert "[u-aa123] [INFO] export ROSE_VERSION=2019.01.8" in caplog.text + assert "[u-aa123] warning: low disk" in caplog.text + + +def test_run_experiments_logs_stderr_on_failure(caplog, manager): + """stderr should be logged at ERROR level before the CalledProcessError propagates.""" + + error = subprocess.CalledProcessError(1, "rose suite-run", output="partial output\n", stderr="fatal: bad config\n") + with mock.patch("access.profiling.cylc_manager.subprocess.run", side_effect=error): + manager.experiments["u-aa123"] = ProfilingExperiment(path=Path("/fake/path")) + manager.experiments["u-aa123"].status = ProfilingExperimentStatus.NEW + + with caplog.at_level(logging.INFO), pytest.raises(subprocess.CalledProcessError): + manager.run_experiments() + + assert "[u-aa123] partial output" in caplog.text + assert "[u-aa123] fatal: bad config" in caplog.text + + +@mock.patch.object(ProfilingManager, "archive_experiments") +def test_archive_experiments_defaults(mock_archive, manager): + """Cylc archive defaults should be applied before calling the base manager.""" + + manager.archive_experiments() + mock_archive.assert_called_once_with( + exclude_dirs=[".svn", "share"], exclude_files=["*.nc"], follow_symlinks=False, overwrite=False + ) + mock_archive.reset_mock() + + manager.archive_experiments(exclude_dirs=["dir1"], exclude_files=["file1"], follow_symlinks=True, overwrite=True) + mock_archive.assert_called_once_with( + exclude_dirs=["dir1"], exclude_files=["file1"], follow_symlinks=True, overwrite=True + )