Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 150 additions & 13 deletions src/access/profiling/cylc_manager.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
# Copyright 2025 ACCESS-NRI and contributors. See the top-level COPYRIGHT file for details.
# SPDX-License-Identifier: Apache-2.0

import getpass
import logging
import os
import shutil
import subprocess
from abc import ABC, abstractmethod
from pathlib import Path

from access.profiling.cylc_parser import CylcDBReader, CylcProfilingParser
from access.profiling.experiment import ProfilingLog
from access.profiling.experiment import ProfilingExperiment, ProfilingExperimentStatus, ProfilingLog
from access.profiling.manager import ProfilingManager
from access.profiling.parser import ProfilingParser

Expand Down Expand Up @@ -38,36 +42,169 @@ def known_parsers(self) -> dict[str, ProfilingParser]:
"""

def parse_ncpus(self, path: Path, run_path: Path | None = None) -> int:
# this is a symlink
config_path = path / "log/rose-suite-run.conf"
# both the run and original config will store cpu information
config_paths = []
if run_path is not None:
config_paths.append(run_path / "log/rose-suite-run.conf")
Comment thread
micaeljtoliveira marked this conversation as resolved.
config_paths.append(path / "rose-suite.conf")

if not config_path.is_file():
raise FileNotFoundError(f"Could not find suitable config file in {config_path}")
config_path = next((candidate for candidate in config_paths if candidate.is_file()), None)
if config_path is None:
tried = ", ".join(str(p) for p in config_paths)
raise FileNotFoundError(f"Could not find suitable config file. Tried: {tried}")

for line in config_path.read_text().splitlines():
if not line.startswith("!!"):
keypair = line.split("=")
if keypair[0].strip() == self._layout_variable:
layout = keypair[1].split(",")
if not line.startswith("!!") and "=" in line:
key, value = line.split("=", 1)
if key.strip() == self._layout_variable:
layout = value.split(",")
return int(layout[0].strip()) * int(layout[1].strip())

raise ValueError(f"Cannot find layout key, {self._layout_variable}, in {config_path}.")

def add_rose_experiment(self, rose: str, project: str | None = None) -> None:
"""Adds the given rose as an experiment to this manager.

Args:
rose (str): The rose to add as an experiment.
project (str): The project to use to look for the results. If no project is provided,
the project in the PROJECT environment variable will be used.

Raises:
ValueError: If no project is specified and the PROJECT environment variable is not set,
or if the experiment path does not exist.
"""
project = project or os.environ.get("PROJECT")
if project is None:
raise ValueError("No project specified and PROJECT environment variable is not set.")

experiment_path = self.work_dir / rose
if not experiment_path.is_dir():
raise ValueError(f"Experiment path '{experiment_path}' does not exist or is not a directory.")

run_path = Path("/scratch") / project / getpass.getuser() / "cylc-run" / rose

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic to build the run path seems to be very Gadi specific. I would prefer to keep this as general as possible, so could the function take the run_path as argument instead of the project?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like i can get cylc's expected run directory with cylc get-global-config -i '[hosts][localhost]run directory'. The downside being that you need cylc7 module loaded. Would that be preferred, or explicitly pass in run_path?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep this simple and explicitly pass the run path. Users can always use the cylc command in their notebooks if they feel like it.

if not run_path.is_dir():
logger.warning(f"Run path '{run_path}' does not exist. Archiving will only include experiment files.")
run_path = None

self.experiments[rose] = ProfilingExperiment(path=experiment_path, run_path=run_path)
self.experiments[rose].status = ProfilingExperimentStatus.DONE

def run_experiments(self) -> None:
"""Runs Rose Cylc experiments via `rose suite-run` for profiling data generation."""

to_run = {name: exp for name, exp in self.experiments.items() if exp.status == ProfilingExperimentStatus.NEW}

if not to_run:
logger.info("No new experiments to run. Will skip execution.")
return

for name, exp in to_run.items():
logger.info(f"Running experiment '{name}' via rose suite-run in '{exp.path}'.")
try:
result = subprocess.run(["rose", "suite-run"], cwd=exp.path, check=True, capture_output=True, text=True)
except subprocess.CalledProcessError as e:
for line in e.stdout.splitlines():
logger.info(f"[{name}] {line}")
for line in e.stderr.splitlines():
logger.error(f"[{name}] {line}")
raise
for line in result.stdout.splitlines():
logger.info(f"[{name}] {line}")
for line in result.stderr.splitlines():
logger.warning(f"[{name}] {line}")
exp.status = ProfilingExperimentStatus.RUNNING

# TODO: properly detect when running experiments have completed rather than marking them done immediately.
for exp in self.experiments.values():
if exp.status == ProfilingExperimentStatus.RUNNING:
exp.status = ProfilingExperimentStatus.DONE

def delete_experiments(
self,
experiments: list[str] | None = None,
all_experiments: bool = False,
dry_run: bool = False,
) -> None:
"""Deletes Rose Cylc experiments from the work directory and removes them from the manager.

Args:
experiments (list[str] | None): List of experiment names to delete.
all_experiments (bool): If True, deletes all experiments managed by this instance.
dry_run (bool): If True, logs what would be deleted without making any changes. Defaults to False.

Raises:
ValueError: If both experiments and all_experiments are specified, or neither is.
KeyError: If any experiment name is not managed by this instance.
"""
names_to_delete = self._resolve_names(experiments, all_experiments)

for name in names_to_delete:
exp = self.experiments[name]
exp_path = exp.path
run_path = exp.run_path
if dry_run:
logger.info(f"Dry run: would delete experiment directory '{exp_path}' and run directory '{run_path}'.")
continue
if exp_path.is_dir():
logger.info(f"Deleting experiment directory '{exp_path}'.")
shutil.rmtree(exp_path)
else:
logger.warning(f"Experiment directory '{exp_path}' does not exist. Skipping deletion.")
if run_path is not None:
if run_path.is_dir():
logger.info(f"Deleting run directory '{run_path}'.")
shutil.rmtree(run_path)
else:
logger.warning(f"Run directory '{run_path}' does not exist. Skipping deletion.")
del self.experiments[name]

def archive_experiments(
self,
exclude_dirs: list[str] | None = None,
exclude_files: list[str] | None = None,
follow_symlinks: bool = False,
overwrite: bool = False,
) -> None:
"""Archives completed experiments to the specified archive path.

Args:
exclude_dirs (list[str] | None): Directory patterns to exclude when archiving. Defaults to
[".svn", "share"] if not provided.
exclude_files (list[str] | None): File patterns to exclude when archiving. Defaults to
["*.nc"] if not provided.
follow_symlinks (bool): Whether to follow symlinks when archiving. Defaults to False.
overwrite (bool): Whether to overwrite existing archives. Defaults to False.
"""
if exclude_dirs is None:
exclude_dirs = [".svn", "share"]
if exclude_files is None:
exclude_files = ["*.nc"]
super().archive_experiments(
exclude_dirs=exclude_dirs,
exclude_files=exclude_files,
follow_symlinks=follow_symlinks,
overwrite=overwrite,
)

def profiling_logs(self, path: Path, run_path: Path | None = None) -> dict[str, ProfilingLog]:
"""Returns all profiling logs from the specified path.

Args:
path (Path): Path to the experiment directory.
run_path (Path | None): Optional path to a separate runs directory.
run_path (Path | None): Path to the Cylc run directory.
Returns:
dict[str, ProfilingLog]: Dictionary of profiling logs.
"""
if run_path is None:
raise ValueError("Cylc run_path is required to locate profiling logs.")

logs = {}

# setup log paths
suite_log = path / "log/suite/log" # cylc log file
cylcdb = path / "cylc-suite.db" # database with task runtimes
jobdir = path / "log/job" # where task logs are stored
suite_log = run_path / "log/suite/log" # cylc log file
cylcdb = run_path / "cylc-suite.db" # database with task runtimes
jobdir = run_path / "log/job" # where task logs are stored

logs["cylc_suite_log"] = ProfilingLog(suite_log, CylcProfilingParser())
# cylcdb.read_text = lambda x: x # hack to make log work
Expand Down
28 changes: 28 additions & 0 deletions src/access/profiling/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,34 @@ def delete_experiment(self, name: str) -> None:
else:
logger.warning(f"Experiment '{name}' not found; cannot delete.")

def _resolve_names(self, names: list[str] | None, all_experiments: bool) -> set[str]:
"""Validates and resolves a selection of experiment names to operate on.

Args:
names: Explicit list of experiment names, or None when all_experiments is True.
all_experiments: If True, selects all managed experiments.

Returns:
Set of experiment names to operate on.

Raises:
ValueError: If both names and all_experiments are provided, or neither is.
KeyError: If any specified name is not managed by this instance.
"""
if all_experiments and names is not None:
raise ValueError("Pass either names=[...] or all_experiments=True, not both.")
if not all_experiments and not names:
raise ValueError("No experiments specified. Pass either names=[...] or all_experiments=True.")
existing = set(self.experiments.keys())
to_process = existing if all_experiments else set(names)
unmanaged = [e for e in to_process if e not in existing]
if unmanaged:
raise KeyError(
f"Experiments {unmanaged} are not managed by this manager "
f"(existing: {existing}). Please check the names and try again."
)
return to_process

def parse_profiling_data(self):
"""Parses profiling data from the experiments."""
self.data = {}
Expand Down
15 changes: 1 addition & 14 deletions src/access/profiling/payu_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,21 +227,8 @@ def delete_experiments(
dry_run (bool): If True, performs a dry run without deleting files. Defaults to False.
remove_repo_dir (bool): If True, removes the base repository directory if no branches are using it.
"""
if all_branches and branches is not None:
raise ValueError("Pass either branches=[...] or all_branches=True")
branches_to_delete = self._resolve_names(branches, all_branches)

if not all_branches and not branches:
raise ValueError("No branches specified for delete! Pass either branches=[...] or all_branches=True")

existing_branches = set(self.experiments.keys())
branches_to_delete = existing_branches if all_branches else set(branches)

unmanaged_branches = [i for i in branches_to_delete if i not in existing_branches]
if unmanaged_branches:
raise KeyError(
f"Branches {unmanaged_branches} are not managed by the PayuManager"
f" (existing branches: {existing_branches}). Please check the branch names and try again."
)
if not branches_to_delete:
logger.info("No branches to delete after checking against existing branches. Will skip delete.")
return
Expand Down
Loading
Loading