Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions FabNESO/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
"""Neptune Exploratory SOftware (NESO) plugin for FabSim3."""

try:
from .tasks import neso, neso_ensemble, neso_vbmc, neso_write_field
from .tasks import (
neso,
neso_ensemble,
neso_pce_analysis,
neso_pce_ensemble,
neso_vbmc,
neso_write_field,
)

__all__ = ["neso", "neso_ensemble", "neso_vbmc", "neso_write_field"]
__all__ = [
"neso",
"neso_ensemble",
"neso_vbmc",
"neso_write_field",
"neso_pce_ensemble",
"neso_pce_analysis",
]

except ImportError:
import warnings
Expand Down
225 changes: 224 additions & 1 deletion FabNESO/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,23 @@
Defines tasks for running simulations using Neptune Exploratory Software (NESO).
"""

import json
import pickle
import re
import shutil
import subprocess
import time
from contextlib import nullcontext
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any
from typing import Any, Literal, TypeAlias

import chaospy as cp
import numpy as np
import pandas as pd
import pyvbmc
from easyvvuq.analysis import PCEAnalysis
from easyvvuq.sampling import PCESampler

try:
from fabsim.base import fab
Expand Down Expand Up @@ -579,3 +586,219 @@ def log_density(
(config_dict["initial_run"] - observed_results["field_value"]) ** 2
/ (2 * config_dict["observation_noise_std"] ** 2)
).sum()


def _parse_pce_bounds_string(
parameter_scan_string: str,
delimiter: str,
) -> tuple[float, float]:
lower, upper = parameter_scan_string.split(delimiter)
return float(lower), float(upper)


def _parse_float_or_int_string_literal(literal: str) -> float | int:
try:
return int(literal)
except ValueError:
return float(literal)


PCEVariant: TypeAlias = Literal[
"pseudo-spectral", "pseudo-spectral-sparse", "point-collocation"
]


@fab.task
@fab.load_plugin_env_vars("FabNESO")
def neso_pce_ensemble(
config: str,
solver: str = "Electrostatic2D3V",
conditions_file_name: str = "conditions.xml",
mesh_file_name: str = "mesh.xml",
polynomial_order: int = 4,
variant: PCEVariant = "pseudo-spectral",
processes: str | int = 4,
nodes: str | int = 1,
cpus_per_process: str | int = 1,
wall_time: str = "00:15:00",
**parameter_bounds_or_overrides: str,
) -> None:
"""
Run ensemble of NESO simulations to perform a polynomial chaos expansion of outputs.

Generates a set of parameters values (and associated weights) to evaluate model at
using a quadrature rule, and evaluates model outputs at each of these parameter
values. The model outputs can then be approximated by an expansion in a set of
orthogonal (with respect to the assumed distribution over the parameter space)
polynomials, with the coefficients of the expansion estimated from the sampled
model outputs. This task just computes the model outputs for the sampled parameter
values, with the separate `neso_pce_analysis` task using the fetched model outputs
from this task to estimate the coefficients and so form the polynomial expansion
approximation to the model.

Args:
config: Directory with ensemble configuration information.

Keyword Args:
solver: Which NESO solver to use.
conditions_file_name: Name of conditions XML file in configuration directory.
mesh_file_name: Name of mesh XML in configuration directory.
polynomial_order: Polynomial order to use in polynomial chaos expansion.
variant: Polynomial chaos expansion variant to use - one of `point-collocation`
(point-collocation method), `pseudo-spectral` (pseudo-spectral projection
method) or `pseudo-spectral-sparse` (pseudo-spectral projection method with
Smolyak sparse grid).
processes: Number of processes to run in each job in the ensemble.
nodes: Number of nodes to run on. Only applicable when running on a multi-node
system.
cpus_per_process: Number of processing units to use per process. Only
applicable when running on a multi-node system.
wall_time: Maximum time to allow job to run for. Only applicable when submitting
to a job scheduler.
**parameter_bounds_or_overrides: Bounds of parameters to vary in polynomial
chaos expansion or fixed overrides for parameters from values in conditions
file in configuration directory. Each value is either a colon-separated
string `lower_bound:upper_bound` specifying lower and upper bounds for
independent uniform distributions over parameter values, or a string
specifying a single int or float, in which case the corresponding parameter
is considered fixed but the value given is used to override its default
value in the conditions file.

"""
_check_fab_module_imported()
parameter_distributions = {
parameter_name: cp.Uniform(*_parse_pce_bounds_string(string_value, ":"))
for parameter_name, string_value in parameter_bounds_or_overrides.items()
if ":" in string_value
}
parameter_overrides = {
parameter_name: _parse_float_or_int_string_literal(string_value)
for parameter_name, string_value in parameter_bounds_or_overrides.items()
if ":" not in string_value
}
# Map variant specifier to EasyVVUQ PCESampler keyword arguments - regression is
# used to switch between point-collocation (True) and pseudo-spectral (False)
# methods, while sparse (True) enables Smolyak sparse grid with pseudo-spectral
# method.
match variant:
case "point-collocation":
regression = True
sparse = False
case "pseudo-spectral":
regression = False
sparse = False
case "pseudo-spectral-sparse":
regression = False
sparse = True
pce_sampler = PCESampler(
parameter_distributions,
polynomial_order=int(polynomial_order),
regression=regression,
sparse=sparse,
)
parameter_samples = list(pce_sampler)
processes, nodes, cpus_per_process, wall_time = _check_and_process_resource_args(
processes, nodes, cpus_per_process, wall_time
)
path_to_config = Path(fab.find_config_file_path(config))
with TemporaryDirectory(
prefix=f"{config}_", dir=path_to_config.parent
) as temporary_config_directory:
temporary_config_path = Path(temporary_config_directory)
for sample_index, parameter_dict in enumerate(parameter_samples):
directory_path = temporary_config_path / "SWEEP" / f"sample_{sample_index}"
shutil.copytree(path_to_config, directory_path)
edit_parameters(
directory_path / conditions_file_name,
parameter_dict | parameter_overrides,
)
config = temporary_config_path.name
path_to_config = temporary_config_path
sweep_dir = str(path_to_config / "SWEEP")
fab.update_environment(
_create_job_args_dict(
solver,
conditions_file_name,
mesh_file_name,
processes,
nodes,
cpus_per_process,
wall_time,
)
)
fab.with_config(config)
fab.run_ensemble(config, sweep_dir)
job_name = template(fab.env.job_name_template)
local_results_directory = Path(fab.env.local_results) / job_name
local_results_directory.mkdir(parents=True)
with (local_results_directory / "pce_sampler.pickle").open("wb") as f:
pickle.dump(pce_sampler, f)
with (local_results_directory / "parameter_samples.json").open("w") as f:
json.dump(parameter_samples, f)
print( # noqa: T201
f"Sampler pickle and parameters JSON saved to {local_results_directory}.\n"
"Run fetch_results task once runs are completed to also save run outputs "
"to same directory and then run neso_pce_analysis task to compute "
"PCE expansion from run outputs."
)


@fab.task
@fab.load_plugin_env_vars("FabNESO")
def neso_pce_analysis(
config: Path | str,
results_dir: Path | str,
extract_outputs_script: str = "extract_outputs.py",
) -> None:
"""
Analyse outputs from a previous polynomial chaos expansion (PCE) ensemble run.

Uses run outputs for sampled parameter values to compute a PCE approximation to
model, and uses this to compute various statistics of output and allowing
construction of a surrogate model. The analysis results are saved to a pickle file
`pce_analysis_results.pickle` in the results directory.

Args:
config: Name of configuration directory with script to use to extract relevant
solver outputs from results files and output to a JSON file.
results_dir: Directory containing PCE ensemble outputs from a run of `neso_pce`
task. The analysis results pickle file will be written to this directory.

Keyword Args:
extract_outputs_script: Name of script for extracting outputs from results
files in configuration directory.

"""
_check_fab_module_imported()
path_to_config = Path(fab.find_config_file_path(config))
extract_outputs_script_path = path_to_config / extract_outputs_script
results_dir = Path(results_dir)
with (results_dir / "pce_sampler.pickle").open("rb") as f:
pce_sampler = pickle.load(f) # noqa: S301
with (results_dir / "parameter_samples.json").open("r") as f:
parameter_samples = json.load(f)
results_data = []
for sample_index, parameter_dict in enumerate(parameter_samples):
sample_results_dir = results_dir / "RUNS" / f"sample_{sample_index}"
outputs_file = sample_results_dir / "outputs.json"
subprocess.call(
[ # noqa: S603, S607
"python",
str(extract_outputs_script_path),
sample_results_dir,
outputs_file,
]
)
with outputs_file.open("r") as f:
outputs = json.load(f)
results_data.append(
{(key, i): v for key, value in outputs.items() for i, v in enumerate(value)}
| {(key, 0): value for key, value in parameter_dict.items()}
)
results_dataframe = pd.DataFrame(
results_data, columns=pd.MultiIndex.from_tuples(results_data[0].keys())
)
pce_analysis = PCEAnalysis(sampler=pce_sampler, qoi_cols=list(outputs.keys()))
analysis_results = pce_analysis.analyse(results_dataframe)
with (results_dir / "pce_analysis_results.pickle").open("wb") as f:
pickle.dump(analysis_results, f)
29 changes: 29 additions & 0 deletions config_files/2Din3D-hw/extract_outputs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""Extract energy and enstrophy values from CSV file and output to JSON file."""

import argparse
import json
from pathlib import Path

import pandas as pd


def extract_outputs(results_directory: str) -> dict[str, list[float]]:
"""Extract energy and enstrophy values from CSV file."""
growth_rates_dataframe = pd.read_csv(Path(results_directory) / "growth_rates.csv")
return {col: growth_rates_dataframe[col].to_list() for col in ("E", "W")}


if __name__ == "__main__":
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"results_directory",
type=Path,
help="Directory solver outputs were written to",
)
parser.add_argument(
"output_file", type=Path, help="Path to write JSON output file to"
)
args = parser.parse_args()
outputs = extract_outputs(args.results_directory)
with args.output_file.open("w") as f:
json.dump(outputs, f)
43 changes: 43 additions & 0 deletions config_files/two_stream/extract_outputs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
""""Extract line field derivatives at final time from HDF5 file and output to JSON."""

import argparse
import json
from pathlib import Path

import h5py


def _get_step_keys(hdf5_file: h5py.File) -> list[str]:
"""Get ordered list of step keys."""
return sorted(hdf5_file.keys(), key=lambda k: int(k.split("#")[-1]))


def extract_outputs(results_directory: str) -> dict[str, list[float]]:
"""Extract line field derivatives at final time from HDF5 file."""
hdf5_file_path = (
Path(results_directory)
/ "Electrostatic2D3V_line_field_deriv_evaluations.h5part"
)
with h5py.File(hdf5_file_path, "r") as hdf5_file:
step_keys = _get_step_keys(hdf5_file)
last_step_key = step_keys[-1]
return {
"x": list(hdf5_file[last_step_key]["x"]),
"phi": list(hdf5_file[last_step_key]["FIELD_EVALUATION_0"]),
}


if __name__ == "__main__":
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"results_directory",
type=Path,
help="Directory solver outputs were written to",
)
parser.add_argument(
"output_file", type=Path, help="Path to write JSON output file to"
)
args = parser.parse_args()
outputs = extract_outputs(args.results_directory)
with args.output_file.open("w") as f:
json.dump(outputs, f)
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Configuration file for the Sphinx documentation builder.""" # noqa: INP001
"""Configuration file for the Sphinx documentation builder."""

from importlib.metadata import version as get_version

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ ignore = [
"D212", # multi-line-summary-first-line
"D407", # removed dashes lines under sections
"D417", # argument description in docstring (unreliable)
"INP001", # implicit-namespace-package - allow due to FabSim3 packaging conventions
"ISC001", # simplify implicit str concatenation (ruff-format recommended)
"N999", # invalid-module-name - follow FabSim3 convention of CamelCase plugin names
]
Expand Down
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
h5py>=3.10.0
PyVBMC>=1.0.1
easyvvuq>=1.2
chaospy>=4.3.2
pandas>=2.0