-
Notifications
You must be signed in to change notification settings - Fork 2
Implement add_rose_experiment and delete/archive/run_experiments to CylcRoseManager #59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
09a14aa
eaaf06c
4f4890a
602a727
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,12 +1,16 @@ | ||
| # Copyright 2025 ACCESS-NRI and contributors. See the top-level COPYRIGHT file for details. | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| import getpass | ||
| import logging | ||
| import os | ||
| import shutil | ||
| import subprocess | ||
| from abc import ABC, abstractmethod | ||
| from pathlib import Path | ||
|
|
||
| from access.profiling.cylc_parser import CylcDBReader, CylcProfilingParser | ||
| from access.profiling.experiment import ProfilingLog | ||
| from access.profiling.experiment import ProfilingExperiment, ProfilingExperimentStatus, ProfilingLog | ||
| from access.profiling.manager import ProfilingManager | ||
| from access.profiling.parser import ProfilingParser | ||
|
|
||
|
|
@@ -38,36 +42,169 @@ def known_parsers(self) -> dict[str, ProfilingParser]: | |
| """ | ||
|
|
||
| def parse_ncpus(self, path: Path, run_path: Path | None = None) -> int: | ||
| # this is a symlink | ||
| config_path = path / "log/rose-suite-run.conf" | ||
| # both the run and original config will store cpu information | ||
| config_paths = [] | ||
| if run_path is not None: | ||
| config_paths.append(run_path / "log/rose-suite-run.conf") | ||
| config_paths.append(path / "rose-suite.conf") | ||
|
|
||
| if not config_path.is_file(): | ||
| raise FileNotFoundError(f"Could not find suitable config file in {config_path}") | ||
| config_path = next((candidate for candidate in config_paths if candidate.is_file()), None) | ||
| if config_path is None: | ||
| tried = ", ".join(str(p) for p in config_paths) | ||
| raise FileNotFoundError(f"Could not find suitable config file. Tried: {tried}") | ||
|
|
||
| for line in config_path.read_text().splitlines(): | ||
| if not line.startswith("!!"): | ||
| keypair = line.split("=") | ||
| if keypair[0].strip() == self._layout_variable: | ||
| layout = keypair[1].split(",") | ||
| if not line.startswith("!!") and "=" in line: | ||
| key, value = line.split("=", 1) | ||
| if key.strip() == self._layout_variable: | ||
| layout = value.split(",") | ||
| return int(layout[0].strip()) * int(layout[1].strip()) | ||
|
|
||
| raise ValueError(f"Cannot find layout key, {self._layout_variable}, in {config_path}.") | ||
|
|
||
| def add_rose_experiment(self, rose: str, project: str | None = None) -> None: | ||
| """Adds the given rose as an experiment to this manager. | ||
|
|
||
| Args: | ||
| rose (str): The rose to add as an experiment. | ||
| project (str): The project to use to look for the results. If no project is provided, | ||
| the project in the PROJECT environment variable will be used. | ||
|
|
||
| Raises: | ||
| ValueError: If no project is specified and the PROJECT environment variable is not set, | ||
| or if the experiment path does not exist. | ||
| """ | ||
| project = project or os.environ.get("PROJECT") | ||
| if project is None: | ||
| raise ValueError("No project specified and PROJECT environment variable is not set.") | ||
|
|
||
| experiment_path = self.work_dir / rose | ||
| if not experiment_path.is_dir(): | ||
| raise ValueError(f"Experiment path '{experiment_path}' does not exist or is not a directory.") | ||
|
|
||
| run_path = Path("/scratch") / project / getpass.getuser() / "cylc-run" / rose | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The logic to build the run path seems to be very Gadi specific. I would prefer to keep this as general as possible, so could the function take the
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks like i can get cylc's expected run directory with
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would keep this simple and explicitly pass the run path. Users can always use the cylc command in their notebooks if they feel like it. |
||
| if not run_path.is_dir(): | ||
| logger.warning(f"Run path '{run_path}' does not exist. Archiving will only include experiment files.") | ||
| run_path = None | ||
|
|
||
| self.experiments[rose] = ProfilingExperiment(path=experiment_path, run_path=run_path) | ||
| self.experiments[rose].status = ProfilingExperimentStatus.DONE | ||
|
|
||
| def run_experiments(self) -> None: | ||
| """Runs Rose Cylc experiments via `rose suite-run` for profiling data generation.""" | ||
|
|
||
| to_run = {name: exp for name, exp in self.experiments.items() if exp.status == ProfilingExperimentStatus.NEW} | ||
|
|
||
| if not to_run: | ||
| logger.info("No new experiments to run. Will skip execution.") | ||
| return | ||
|
|
||
| for name, exp in to_run.items(): | ||
| logger.info(f"Running experiment '{name}' via rose suite-run in '{exp.path}'.") | ||
| try: | ||
| result = subprocess.run(["rose", "suite-run"], cwd=exp.path, check=True, capture_output=True, text=True) | ||
| except subprocess.CalledProcessError as e: | ||
| for line in e.stdout.splitlines(): | ||
| logger.info(f"[{name}] {line}") | ||
| for line in e.stderr.splitlines(): | ||
| logger.error(f"[{name}] {line}") | ||
| raise | ||
| for line in result.stdout.splitlines(): | ||
| logger.info(f"[{name}] {line}") | ||
| for line in result.stderr.splitlines(): | ||
| logger.warning(f"[{name}] {line}") | ||
| exp.status = ProfilingExperimentStatus.RUNNING | ||
|
|
||
| # TODO: properly detect when running experiments have completed rather than marking them done immediately. | ||
| for exp in self.experiments.values(): | ||
| if exp.status == ProfilingExperimentStatus.RUNNING: | ||
| exp.status = ProfilingExperimentStatus.DONE | ||
|
|
||
| def delete_experiments( | ||
| self, | ||
| experiments: list[str] | None = None, | ||
| all_experiments: bool = False, | ||
| dry_run: bool = False, | ||
| ) -> None: | ||
| """Deletes Rose Cylc experiments from the work directory and removes them from the manager. | ||
|
|
||
| Args: | ||
| experiments (list[str] | None): List of experiment names to delete. | ||
| all_experiments (bool): If True, deletes all experiments managed by this instance. | ||
| dry_run (bool): If True, logs what would be deleted without making any changes. Defaults to False. | ||
|
|
||
| Raises: | ||
| ValueError: If both experiments and all_experiments are specified, or neither is. | ||
| KeyError: If any experiment name is not managed by this instance. | ||
| """ | ||
| names_to_delete = self._resolve_names(experiments, all_experiments) | ||
|
|
||
| for name in names_to_delete: | ||
| exp = self.experiments[name] | ||
| exp_path = exp.path | ||
| run_path = exp.run_path | ||
| if dry_run: | ||
| logger.info(f"Dry run: would delete experiment directory '{exp_path}' and run directory '{run_path}'.") | ||
| continue | ||
| if exp_path.is_dir(): | ||
| logger.info(f"Deleting experiment directory '{exp_path}'.") | ||
| shutil.rmtree(exp_path) | ||
| else: | ||
| logger.warning(f"Experiment directory '{exp_path}' does not exist. Skipping deletion.") | ||
| if run_path is not None: | ||
| if run_path.is_dir(): | ||
| logger.info(f"Deleting run directory '{run_path}'.") | ||
| shutil.rmtree(run_path) | ||
| else: | ||
| logger.warning(f"Run directory '{run_path}' does not exist. Skipping deletion.") | ||
| del self.experiments[name] | ||
|
|
||
| def archive_experiments( | ||
| self, | ||
| exclude_dirs: list[str] | None = None, | ||
| exclude_files: list[str] | None = None, | ||
| follow_symlinks: bool = False, | ||
| overwrite: bool = False, | ||
| ) -> None: | ||
| """Archives completed experiments to the specified archive path. | ||
|
|
||
| Args: | ||
| exclude_dirs (list[str] | None): Directory patterns to exclude when archiving. Defaults to | ||
| [".svn", "share"] if not provided. | ||
| exclude_files (list[str] | None): File patterns to exclude when archiving. Defaults to | ||
| ["*.nc"] if not provided. | ||
| follow_symlinks (bool): Whether to follow symlinks when archiving. Defaults to False. | ||
| overwrite (bool): Whether to overwrite existing archives. Defaults to False. | ||
| """ | ||
| if exclude_dirs is None: | ||
| exclude_dirs = [".svn", "share"] | ||
| if exclude_files is None: | ||
| exclude_files = ["*.nc"] | ||
| super().archive_experiments( | ||
| exclude_dirs=exclude_dirs, | ||
| exclude_files=exclude_files, | ||
| follow_symlinks=follow_symlinks, | ||
| overwrite=overwrite, | ||
| ) | ||
|
|
||
| def profiling_logs(self, path: Path, run_path: Path | None = None) -> dict[str, ProfilingLog]: | ||
| """Returns all profiling logs from the specified path. | ||
|
|
||
| Args: | ||
| path (Path): Path to the experiment directory. | ||
| run_path (Path | None): Optional path to a separate runs directory. | ||
| run_path (Path | None): Path to the Cylc run directory. | ||
| Returns: | ||
| dict[str, ProfilingLog]: Dictionary of profiling logs. | ||
| """ | ||
| if run_path is None: | ||
| raise ValueError("Cylc run_path is required to locate profiling logs.") | ||
|
|
||
| logs = {} | ||
|
|
||
| # setup log paths | ||
| suite_log = path / "log/suite/log" # cylc log file | ||
| cylcdb = path / "cylc-suite.db" # database with task runtimes | ||
| jobdir = path / "log/job" # where task logs are stored | ||
| suite_log = run_path / "log/suite/log" # cylc log file | ||
| cylcdb = run_path / "cylc-suite.db" # database with task runtimes | ||
| jobdir = run_path / "log/job" # where task logs are stored | ||
|
|
||
| logs["cylc_suite_log"] = ProfilingLog(suite_log, CylcProfilingParser()) | ||
| # cylcdb.read_text = lambda x: x # hack to make log work | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.