From 35691928a83391390a158e5d73af893dffcff9e8 Mon Sep 17 00:00:00 2001 From: Matt Graham Date: Thu, 15 Feb 2024 17:07:36 +0000 Subject: [PATCH 1/2] Add polynomial chaos expansion ensemble + analysis tasks --- FabNESO/__init__.py | 18 +- FabNESO/tasks.py | 225 ++++++++++++++++++++- config_files/2Din3D-hw/extract_outputs.py | 29 +++ config_files/two_stream/extract_outputs.py | 43 ++++ pyproject.toml | 1 + requirements.txt | 3 + 6 files changed, 316 insertions(+), 3 deletions(-) create mode 100644 config_files/2Din3D-hw/extract_outputs.py create mode 100644 config_files/two_stream/extract_outputs.py diff --git a/FabNESO/__init__.py b/FabNESO/__init__.py index c5bfed2..b953901 100644 --- a/FabNESO/__init__.py +++ b/FabNESO/__init__.py @@ -1,9 +1,23 @@ """Neptune Exploratory SOftware (NESO) plugin for FabSim3.""" try: - from .tasks import neso, neso_ensemble, neso_vbmc, neso_write_field + from .tasks import ( + neso, + neso_ensemble, + neso_pce_analysis, + neso_pce_ensemble, + neso_vbmc, + neso_write_field, + ) - __all__ = ["neso", "neso_ensemble", "neso_vbmc", "neso_write_field"] + __all__ = [ + "neso", + "neso_ensemble", + "neso_vbmc", + "neso_write_field", + "neso_pce_ensemble", + "neso_pce_analysis", + ] except ImportError: import warnings diff --git a/FabNESO/tasks.py b/FabNESO/tasks.py index 611441a..13bfc38 100644 --- a/FabNESO/tasks.py +++ b/FabNESO/tasks.py @@ -4,16 +4,23 @@ Defines tasks for running simulations using Neptune Exploratory Software (NESO). """ +import json +import pickle import re import shutil +import subprocess import time from contextlib import nullcontext from pathlib import Path from tempfile import TemporaryDirectory -from typing import Any +from typing import Any, Literal, TypeAlias +import chaospy as cp import numpy as np +import pandas as pd import pyvbmc +from easyvvuq.analysis import PCEAnalysis +from easyvvuq.sampling import PCESampler try: from fabsim.base import fab @@ -579,3 +586,219 @@ def log_density( (config_dict["initial_run"] - observed_results["field_value"]) ** 2 / (2 * config_dict["observation_noise_std"] ** 2) ).sum() + + +def _parse_pce_bounds_string( + parameter_scan_string: str, + delimiter: str, +) -> tuple[float, float]: + lower, upper = parameter_scan_string.split(delimiter) + return float(lower), float(upper) + + +def _parse_float_or_int_string_literal(literal: str) -> float | int: + try: + return int(literal) + except ValueError: + return float(literal) + + +PCEVariant: TypeAlias = Literal[ + "pseudo-spectral", "pseudo-spectral-sparse", "point-collocation" +] + + +@fab.task +@fab.load_plugin_env_vars("FabNESO") +def neso_pce_ensemble( + config: str, + solver: str = "Electrostatic2D3V", + conditions_file_name: str = "conditions.xml", + mesh_file_name: str = "mesh.xml", + polynomial_order: int = 4, + variant: PCEVariant = "pseudo-spectral", + processes: str | int = 4, + nodes: str | int = 1, + cpus_per_process: str | int = 1, + wall_time: str = "00:15:00", + **parameter_bounds_or_overrides: str, +) -> None: + """ + Run ensemble of NESO simulations to perform a polynomial chaos expansion of outputs. + + Generates a set of parameters values (and associated weights) to evaluate model at + using a quadrature rule, and evaluates model outputs at each of these parameter + values. The model outputs can then be approximated by an expansion in a set of + orthogonal (with respect to the assumed distribution over the parameter space) + polynomials, with the coefficients of the expansion estimated from the sampled + model outputs. This task just computes the model outputs for the sampled parameter + values, with the separate `neso_pce_analysis` task using the fetched model outputs + from this task to estimate the coefficients and so form the polynomial expansion + approximation to the model. + + Args: + config: Directory with ensemble configuration information. + + Keyword Args: + solver: Which NESO solver to use. + conditions_file_name: Name of conditions XML file in configuration directory. + mesh_file_name: Name of mesh XML in configuration directory. + polynomial_order: Polynomial order to use in polynomial chaos expansion. + variant: Polynomial chaos expansion variant to use - one of `point-collocation` + (point-collocation method), `pseudo-spectral` (pseudo-spectral projection + method) or `pseudo-spectral-sparse` (pseudo-spectral projection method with + Smolyak sparse grid). + processes: Number of processes to run in each job in the ensemble. + nodes: Number of nodes to run on. Only applicable when running on a multi-node + system. + cpus_per_process: Number of processing units to use per process. Only + applicable when running on a multi-node system. + wall_time: Maximum time to allow job to run for. Only applicable when submitting + to a job scheduler. + **parameter_bounds_or_overrides: Bounds of parameters to vary in polynomial + chaos expansion or fixed overrides for parameters from values in conditions + file in configuration directory. Each value is either a colon-separated + string `lower_bound:upper_bound` specifying lower and upper bounds for + independent uniform distributions over parameter values, or a string + specifying a single int or float, in which case the corresponding parameter + is considered fixed but the value given is used to override its default + value in the conditions file. + + """ + _check_fab_module_imported() + parameter_distributions = { + parameter_name: cp.Uniform(*_parse_pce_bounds_string(string_value, ":")) + for parameter_name, string_value in parameter_bounds_or_overrides.items() + if ":" in string_value + } + parameter_overrides = { + parameter_name: _parse_float_or_int_string_literal(string_value) + for parameter_name, string_value in parameter_bounds_or_overrides.items() + if ":" not in string_value + } + # Map variant specifier to EasyVVUQ PCESampler keyword arguments - regression is + # used to switch between point-collocation (True) and pseudo-spectral (False) + # methods, while sparse (True) enables Smolyak sparse grid with pseudo-spectral + # method. + match variant: + case "point-collocation": + regression = True + sparse = False + case "pseudo-spectral": + regression = False + sparse = False + case "pseudo-spectral-sparse": + regression = False + sparse = True + pce_sampler = PCESampler( + parameter_distributions, + polynomial_order=int(polynomial_order), + regression=regression, + sparse=sparse, + ) + parameter_samples = list(pce_sampler) + processes, nodes, cpus_per_process, wall_time = _check_and_process_resource_args( + processes, nodes, cpus_per_process, wall_time + ) + path_to_config = Path(fab.find_config_file_path(config)) + with TemporaryDirectory( + prefix=f"{config}_", dir=path_to_config.parent + ) as temporary_config_directory: + temporary_config_path = Path(temporary_config_directory) + for sample_index, parameter_dict in enumerate(parameter_samples): + directory_path = temporary_config_path / "SWEEP" / f"sample_{sample_index}" + shutil.copytree(path_to_config, directory_path) + edit_parameters( + directory_path / conditions_file_name, + parameter_dict | parameter_overrides, + ) + config = temporary_config_path.name + path_to_config = temporary_config_path + sweep_dir = str(path_to_config / "SWEEP") + fab.update_environment( + _create_job_args_dict( + solver, + conditions_file_name, + mesh_file_name, + processes, + nodes, + cpus_per_process, + wall_time, + ) + ) + fab.with_config(config) + fab.run_ensemble(config, sweep_dir) + job_name = template(fab.env.job_name_template) + local_results_directory = Path(fab.env.local_results) / job_name + local_results_directory.mkdir(parents=True) + with (local_results_directory / "pce_sampler.pickle").open("wb") as f: + pickle.dump(pce_sampler, f) + with (local_results_directory / "parameter_samples.json").open("w") as f: + json.dump(parameter_samples, f) + print( # noqa: T201 + f"Sampler pickle and parameters JSON saved to {local_results_directory}.\n" + "Run fetch_results task once runs are completed to also save run outputs " + "to same directory and then run neso_pce_analysis task to compute " + "PCE expansion from run outputs." + ) + + +@fab.task +@fab.load_plugin_env_vars("FabNESO") +def neso_pce_analysis( + config: Path | str, + results_dir: Path | str, + extract_outputs_script: str = "extract_outputs.py", +) -> None: + """ + Analyse outputs from a previous polynomial chaos expansion (PCE) ensemble run. + + Uses run outputs for sampled parameter values to compute a PCE approximation to + model, and uses this to compute various statistics of output and allowing + construction of a surrogate model. The analysis results are saved to a pickle file + `pce_analysis_results.pickle` in the results directory. + + Args: + config: Name of configuration directory with script to use to extract relevant + solver outputs from results files and output to a JSON file. + results_dir: Directory containing PCE ensemble outputs from a run of `neso_pce` + task. The analysis results pickle file will be written to this directory. + + Keyword Args: + extract_outputs_script: Name of script for extracting outputs from results + files in configuration directory. + + """ + _check_fab_module_imported() + path_to_config = Path(fab.find_config_file_path(config)) + extract_outputs_script_path = path_to_config / extract_outputs_script + results_dir = Path(results_dir) + with (results_dir / "pce_sampler.pickle").open("rb") as f: + pce_sampler = pickle.load(f) # noqa: S301 + with (results_dir / "parameter_samples.json").open("r") as f: + parameter_samples = json.load(f) + results_data = [] + for sample_index, parameter_dict in enumerate(parameter_samples): + sample_results_dir = results_dir / "RUNS" / f"sample_{sample_index}" + outputs_file = sample_results_dir / "outputs.json" + subprocess.call( + [ # noqa: S603, S607 + "python", + str(extract_outputs_script_path), + sample_results_dir, + outputs_file, + ] + ) + with outputs_file.open("r") as f: + outputs = json.load(f) + results_data.append( + {(key, i): v for key, value in outputs.items() for i, v in enumerate(value)} + | {(key, 0): value for key, value in parameter_dict.items()} + ) + results_dataframe = pd.DataFrame( + results_data, columns=pd.MultiIndex.from_tuples(results_data[0].keys()) + ) + pce_analysis = PCEAnalysis(sampler=pce_sampler, qoi_cols=list(outputs.keys())) + analysis_results = pce_analysis.analyse(results_dataframe) + with (results_dir / "pce_analysis_results.pickle").open("wb") as f: + pickle.dump(analysis_results, f) diff --git a/config_files/2Din3D-hw/extract_outputs.py b/config_files/2Din3D-hw/extract_outputs.py new file mode 100644 index 0000000..c1cc627 --- /dev/null +++ b/config_files/2Din3D-hw/extract_outputs.py @@ -0,0 +1,29 @@ +"""Extract energy and enstrophy values from CSV file and output to JSON file.""" + +import argparse +import json +from pathlib import Path + +import pandas as pd + + +def extract_outputs(results_directory: str) -> dict[str, list[float]]: + """Extract energy and enstrophy values from CSV file.""" + growth_rates_dataframe = pd.read_csv(Path(results_directory) / "growth_rates.csv") + return {col: growth_rates_dataframe[col].to_list() for col in ("E", "W")} + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "results_directory", + type=Path, + help="Directory solver outputs were written to", + ) + parser.add_argument( + "output_file", type=Path, help="Path to write JSON output file to" + ) + args = parser.parse_args() + outputs = extract_outputs(args.results_directory) + with args.output_file.open("w") as f: + json.dump(outputs, f) diff --git a/config_files/two_stream/extract_outputs.py b/config_files/two_stream/extract_outputs.py new file mode 100644 index 0000000..99418c8 --- /dev/null +++ b/config_files/two_stream/extract_outputs.py @@ -0,0 +1,43 @@ +""""Extract line field derivatives at final time from HDF5 file and output to JSON.""" + +import argparse +import json +from pathlib import Path + +import h5py + + +def _get_step_keys(hdf5_file: h5py.File) -> list[str]: + """Get ordered list of step keys.""" + return sorted(hdf5_file.keys(), key=lambda k: int(k.split("#")[-1])) + + +def extract_outputs(results_directory: str) -> dict[str, list[float]]: + """Extract line field derivatives at final time from HDF5 file.""" + hdf5_file_path = ( + Path(results_directory) + / "Electrostatic2D3V_line_field_deriv_evaluations.h5part" + ) + with h5py.File(hdf5_file_path, "r") as hdf5_file: + step_keys = _get_step_keys(hdf5_file) + last_step_key = step_keys[-1] + return { + "x": list(hdf5_file[last_step_key]["x"]), + "phi": list(hdf5_file[last_step_key]["FIELD_EVALUATION_0"]), + } + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "results_directory", + type=Path, + help="Directory solver outputs were written to", + ) + parser.add_argument( + "output_file", type=Path, help="Path to write JSON output file to" + ) + args = parser.parse_args() + outputs = extract_outputs(args.results_directory) + with args.output_file.open("w") as f: + json.dump(outputs, f) diff --git a/pyproject.toml b/pyproject.toml index 0d9e47b..e2cb19b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,6 +71,7 @@ ignore = [ "D212", # multi-line-summary-first-line "D407", # removed dashes lines under sections "D417", # argument description in docstring (unreliable) + "INP001", # implicit-namespace-package - allow due to FabSim3 packaging conventions "ISC001", # simplify implicit str concatenation (ruff-format recommended) "N999", # invalid-module-name - follow FabSim3 convention of CamelCase plugin names ] diff --git a/requirements.txt b/requirements.txt index 167df30..197ef6f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,5 @@ h5py>=3.10.0 PyVBMC>=1.0.1 +easyvvuq>=1.2 +chaospy>=4.3.2 +pandas>=2.0 From 81f719b4b7ae954e0573c34748552d36a67c14a9 Mon Sep 17 00:00:00 2001 From: Matt Graham Date: Thu, 15 Feb 2024 18:12:10 +0000 Subject: [PATCH 2/2] Removing no longer required local noqa directive to satisfy Ruff --- docs/conf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/conf.py b/docs/conf.py index a9740b3..2702fa2 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -1,4 +1,4 @@ -"""Configuration file for the Sphinx documentation builder.""" # noqa: INP001 +"""Configuration file for the Sphinx documentation builder.""" from importlib.metadata import version as get_version