From ae0411c537fe5187215a9bf6adae3b1782e2bce1 Mon Sep 17 00:00:00 2001 From: Leif Denby Date: Wed, 14 May 2025 12:35:17 +0200 Subject: [PATCH 01/19] cli for building forecast inference datasets --- src/mlwm/create_forecast_inference_dataset.py | 227 ++++++++++++++++++ 1 file changed, 227 insertions(+) create mode 100644 src/mlwm/create_forecast_inference_dataset.py diff --git a/src/mlwm/create_forecast_inference_dataset.py b/src/mlwm/create_forecast_inference_dataset.py new file mode 100644 index 0000000..c7b10db --- /dev/null +++ b/src/mlwm/create_forecast_inference_dataset.py @@ -0,0 +1,227 @@ +import datetime +import tempfile +from pathlib import Path + +import isodate +import rich +import xarray as xr +from loguru import logger + +from . import config as mdp_config +from .create_dataset import create_dataset + + +def parse_key_value_arg(arg: str) -> tuple[str, str]: + """ + Parse a single key=value argument into a dictionary. + + This function is intended for use with argparse's `type=` argument when parsing + command-line arguments like: --overwrite-source-paths key1=value1 key2=value2 + + Args: + arg (str): A string in the format key=value. + + Returns: + Dict[str, str]: A dictionary with one key-value pair parsed from the + input string. + + Raises: + argparse.ArgumentTypeError: If the argument is not in key=value format. + """ + import argparse + + if "=" not in arg: + raise argparse.ArgumentTypeError( + f"Invalid format: '{arg}'. Expected key=value." + ) + + key, value = arg.split("=", 1) + return (key, value) + + +def create_forecast_inference_dataset( + fp_config: str, + analysis_time: datetime.datetime, + overwrite_input_paths: dict, + use_stats_from_path: str, +): + """ + Create forecasting prediction dataset derived from a config file used during + training. In creating the inference dataset, it is assumed that the `time` + dimension of all input datasets used should be replaced by the + `analysis_time` and `elapsed_forecast_duration` dimensions. + """ + + # the new sampling dimension is `analysis_time` + old_sampling_dim = "time" + sampling_dim = "analysis_time" + # instead of only having `time` as dimension, the input forecast datasets + # have two dimensions that describe the time value [analysis_time, + # elapsed_forecast_duration] + dim_replacements = dict( + time=["analysis_time", "elapsed_forecast_duration"], + ) + # there will be a single split called "test" + split_name = "test" + # which will have a single time slice, given by the analysis time argument + # to the script + sampling_coord_range = dict( + start=analysis_time, + end=analysis_time, + ) + + # load and modify the original config file + config = mdp_config.Config.from_yaml_file(file=fp_config) + + if overwrite_input_paths: + for key, value in overwrite_input_paths.items(): + if key not in config.inputs: + raise ValueError( + f"Key {key} not found in config inputs. " + f"Available keys are: {list(config.inputs.keys())}" + ) + logger.info( + f"Overwriting input path for {key} with {value} previously " + f"{config.inputs[key].path}" + ) + config.inputs[key].path = value + + # setup the split (test) for the dataset with a coordinate range along the + # sampling dimension (analysis_time) of length 1 + config.output.splitting = mdp_config.Splitting( + dim=sampling_dim, + splits={split_name: mdp_config.Split(**sampling_coord_range)}, + ) + + # ensure the output data is sampled along the sampling dimension + # (analysis_time) too + config.output.coord_ranges = { + sampling_dim: analysis_time # mdp_config.Range(**sampling_coord_range) + } + + config.output.chunking = {sampling_dim: 1} + + # replace old sampling_dimension (time) dimension in outputs with + # [`analysis_time`, `elapsed_forecast_time`] + for variable, dims in config.output.variables.items(): + if old_sampling_dim in dims: + orig_sampling_dim_index = dims.index(old_sampling_dim) + dims.remove(old_sampling_dim) + for dim in dim_replacements[old_sampling_dim][::-1]: + dims.insert(orig_sampling_dim_index, dim) + config.output.variables[variable] = dims + logger.info( + f"Replaced {old_sampling_dim} dimension with " + f"{dim_replacements[old_sampling_dim]} for {variable}" + ) + + # these dimensions should also be "renamed" from the input datasets + for input_name in config.inputs.keys(): + if "time" in config.inputs[input_name].dim_mapping: + dims = config.inputs[input_name].dims + orig_sampling_dim_index = dims.index(old_sampling_dim) + dims.remove(old_sampling_dim) + for dim in dim_replacements[old_sampling_dim][::-1]: + dims.insert(orig_sampling_dim_index, dim) + config.inputs[input_name].dims = dims + + del config.inputs[input_name].dim_mapping[old_sampling_dim] + + # add new "rename" dim-mappins for `analysis_time` and + # `elapsed_forecast_duration` + for dim in dim_replacements[old_sampling_dim]: + config.inputs[input_name].dim_mapping[ + dim + ] = mdp_config.DimMapping(method="rename", dim=dim) + + # save config to temporary filepath + tmpfile = tempfile.NamedTemporaryFile(delete=False, suffix=".yaml") + fp_config_temp = Path(tmpfile.name) + config.to_yaml_file(fp_config_temp) + logger.info(f"Temporary config file created at {fp_config_temp}") + + rich.print(config) + + if use_stats_from_path is not None: + ds_stats = xr.open_zarr(use_stats_from_path) + + ds = create_dataset(config=config, ds_stats=ds_stats) + + return ds, config + + +def cli(argv=None): + import argparse + + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument("config", help="Path to the config file", type=Path) + parser.add_argument( + "-o", + "--output", + help="Path to the output zarr file", + type=Path, + default=None, + ) + + parser.add_argument( + "--overwrite-input-paths", + nargs="*", + type=parse_key_value_arg, + help=( + "List of key=value pairs used to overwrite input paths of named " + "inputs in the config file. For example: --overwrite-input-paths " + "danra_surface=s3://mybucket/2025-05-01T1200Z/danra_surface.zarr " + "danra_height_levels=s3://mybucket/2025-05-01T1200Z/" + "danra_height_levels.zarr" + ), + ) + parser.add_argument( + "--analysis_time", + required=True, + help="Analysis time to use for the dataset. This is used to select the " + "correct time slice from the input data.", + type=isodate.parse_datetime, + ) + parser.add_argument( + "--use-stats-from-path", + help="Path to zarr dataset with stats to use in the new dataset. " + "Using the option will cause mllam-data-prep to skip calculating " + "stats and instead use the stats from the provided path.", + type=Path, + default=None, + ) + + args = parser.parse_args(argv) + + analysis_time = args.analysis_time + use_stats_from_path = args.use_stats_from_path + fp_config = Path(args.config) + + # Convert the list of tuples to a dictionary + overwrite_input_paths = dict(args.overwrite_input_paths) + + ds, config = create_forecast_inference_dataset( + analysis_time=analysis_time, + fp_config=fp_config, + overwrite_input_paths=overwrite_input_paths, + use_stats_from_path=use_stats_from_path, + ) + + dataset_name = ( + f"{Path(fp_config).name}." + f"{isodate.datetime_isoformat(analysis_time).replace(':','')}" + ) + fp_config_inference = f"{dataset_name}.yaml" + fp_dataset_inference = f"{dataset_name}.zarr" + + logger.info(f"Writing inference dataset to {fp_dataset_inference}") + ds.to_zarr(fp_dataset_inference, mode="w", consolidated=True) + + logger.info(f"Writing inference config to {fp_config_inference}") + config.to_yaml_file(fp_config_inference) + + +if __name__ == "__main__": + cli() From 7705d85b88a3c4f4bee8a0d73c32c8a8c8ac9e0a Mon Sep 17 00:00:00 2001 From: Leif Denby Date: Thu, 18 Sep 2025 09:44:43 +0200 Subject: [PATCH 02/19] working inference dataset creation --- .../surface-dummy-model_DINI/.gitignore | 4 + .../surface-dummy-model_DINI/pyproject.toml | 40 ++++ .../src/run_inference.py | 182 ++++++++++++++++++ src/mlwm/tests/test_paths.py | 3 +- 4 files changed, 227 insertions(+), 2 deletions(-) create mode 100644 configurations/surface-dummy-model_DINI/.gitignore create mode 100644 configurations/surface-dummy-model_DINI/pyproject.toml create mode 100644 configurations/surface-dummy-model_DINI/src/run_inference.py diff --git a/configurations/surface-dummy-model_DINI/.gitignore b/configurations/surface-dummy-model_DINI/.gitignore new file mode 100644 index 0000000..987012e --- /dev/null +++ b/configurations/surface-dummy-model_DINI/.gitignore @@ -0,0 +1,4 @@ +*.zip +*.zarr/ +inference_artifact/ +*.yaml diff --git a/configurations/surface-dummy-model_DINI/pyproject.toml b/configurations/surface-dummy-model_DINI/pyproject.toml new file mode 100644 index 0000000..9d016cc --- /dev/null +++ b/configurations/surface-dummy-model_DINI/pyproject.toml @@ -0,0 +1,40 @@ +[project] +name = "wlwm-surface-dummy-model_DINI" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +requires-python = ">=3.11" +dependencies = [ + "parse>=1.20.2", + "dask>=2025.4.1", + "dotenv>=0.9.9", + "ipdb>=0.13.13", + "s3fs>=2025.3.2", + "tqdm>=4.67.1", + "universal-pathlib>=0.2.6", + "zarr>=3.0", + "mllam-data-prep", + "ipython>=8.37.0", +] + +[dependency-groups] +dev = [ + "pre-commit>=4.2.0", + "pytest>=8.3.5", +] + +[tool.isort] +profile = "black" + +[tool.uv.sources] +mllam-data-prep = { git = "https://github.com/leifdenby/mllam-data-prep", rev = "feat/inference-cli-args" } +[build-system] +requires = ["setuptools>=61", "setuptools_scm"] +build-backend = "setuptools.build_meta" + +[tool.setuptools] +package-dir = {"" = "src"} +include-package-data = true + +[tool.setuptools.packages.find] +where = ["src"] diff --git a/configurations/surface-dummy-model_DINI/src/run_inference.py b/configurations/surface-dummy-model_DINI/src/run_inference.py new file mode 100644 index 0000000..e951128 --- /dev/null +++ b/configurations/surface-dummy-model_DINI/src/run_inference.py @@ -0,0 +1,182 @@ +import copy +import datetime +from typing import Dict + +import mllam_data_prep as mdp +import mllam_data_prep.config as mdp_config +import xarray as xr +from loguru import logger + + +@logger.catch(reraise=True) +def _create_inference_datastore_config( + training_config: mdp.Config, + forecast_analysis_time: datetime.datetime, + forecast_duration: datetime.timedelta, + overwrite_input_paths: Dict[str, str] = {}, + sampling_dim: str = "time", +) -> mdp.Config: + """ + From a training datastore config, create an inference datastore config that: + - samples along a new sampling dimension `sampling_dim` (default: + `analysis_time`) instead of `time` + - has a single split called "test" with a single time slice given by the + `forecast_analysis_time` argument + - optionally overwrites input paths with the `overwrite_input_paths` argument + - ensures that the output variables have the correct dimensions, i.e. + replacing `time` with [`analysis_time`, `elapsed_forecast_duration`] + - ensures that the input datasets have the correct dimensions and dim_mappings, + i.e. replacing `time` with [`analysis_time`, `elapsed_forecast_duration` + + Parameters + ---------- + training_config : mdp.Config + The training config to base the inference config on + forecast_analysis_time : datetime.datetime + The analysis time to use for the inference config + forecast_duration : datetime.timedelta + The forecast duration to use for the inference config + overwrite_input_paths : Dict[str, str], optional + A dictionary of input names and paths to overwrite in the training config, + by default {} + sampling_dim : str, optional + The new sampling dimension to use, by default "time" + + Returns + ------- + mdp.Config + The inference config + """ + # the new sampling dimension is `analysis_time` + old_sampling_dim = "time" + sampling_dim = "analysis_time" + # instead of only having `time` as dimension, the input forecast datasets + # have two dimensions that describe the time value [analysis_time, + # elapsed_forecast_duration] + dim_replacements = dict( + time=["analysis_time", "elapsed_forecast_duration"], + ) + # there will be a single split called "test" + split_name = "test" + # which will have a single time slice, given by the analysis time argument + # to the script + sampling_coord_range = dict( + start=forecast_analysis_time, + end=forecast_analysis_time + forecast_duration, + ) + + inference_config = copy.deepcopy(training_config) + + if len(overwrite_input_paths) > 0: + for key, value in overwrite_input_paths.items(): + if key not in training_config.inputs: + raise ValueError( + f"Key {key} not found in config inputs. " + f"Available keys are: {list(training_config.inputs.keys())}" + ) + logger.info( + f"Overwriting input path for {key} with {value} previously " + f"{training_config.inputs[key].path}" + ) + inference_config.inputs[key].path = value + + # setup the split (test) for the dataset with a coordinate range along the + # sampling dimension (analysis_time) of length 1 + inference_config.output.splitting = mdp_config.Splitting( + dim=sampling_dim, + splits={split_name: mdp_config.Split(**sampling_coord_range)}, + ) + + # ensure the output data is sampled along the sampling dimension + # (analysis_time) too + inference_config.output.coord_ranges = { + sampling_dim: mdp_config.Range(**sampling_coord_range) + } + + inference_config.output.chunking = {sampling_dim: 1} + + # replace old sampling_dimension (time) dimension in outputs with + # [`analysis_time`, `elapsed_forecast_time`] + for variable, dims in training_config.output.variables.items(): + if old_sampling_dim in dims: + orig_sampling_dim_index = dims.index(old_sampling_dim) + dims.remove(old_sampling_dim) + for dim in dim_replacements[old_sampling_dim][::-1]: + dims.insert(orig_sampling_dim_index, dim) + inference_config.output.variables[variable] = dims + logger.info( + f"Replaced {old_sampling_dim} dimension with" + f" {dim_replacements[old_sampling_dim]} for {variable}" + ) + + # these dimensions should also be "renamed" from the input datasets + for input_name in training_config.inputs.keys(): + if "time" in training_config.inputs[input_name].dim_mapping: + dims = training_config.inputs[input_name].dims + orig_sampling_dim_index = dims.index(old_sampling_dim) + dims.remove(old_sampling_dim) + for dim in dim_replacements[old_sampling_dim][::-1]: + dims.insert(orig_sampling_dim_index, dim) + inference_config.inputs[input_name].dims = dims + + del inference_config.inputs[input_name].dim_mapping[ + old_sampling_dim + ] + + # add new "rename" dim-mappins for `analysis_time` and + # `elapsed_forecast_duration` + for dim in dim_replacements[old_sampling_dim]: + inference_config.inputs[input_name].dim_mapping[ + dim + ] = mdp_config.DimMapping(method="rename", dim=dim) + + return inference_config + + +def main(): + fp_stats = "inference_artifact/stats/danra.datastore.stats.zarr" + fp_training_datastore = "inference_artifact/configs/danra.datastore.yaml" + + S3_BUCKET_URL = "https://object-store.os-api.cci1.ecmwf.int/danra" + overwrite_input_paths = dict( + danra_surface=f"{S3_BUCKET_URL}/v0.6.0dev1/single_levels.zarr/", + danra_static=f"{S3_BUCKET_URL}/v0.5.0/single_levels.zarr/", + ) + analysis_time = "2019-02-04T12:00" + forecast_duration = datetime.timedelta(hours=6) + + inference_datastore_config_output_fp = "danra.inference.datastore.yaml" + + ds_stats = xr.open_dataset(fp_stats) + logger.debug(f"Opened stats dataset: {ds_stats}") + + logger.debug( + f"Loading training datastore config from {fp_training_datastore}" + ) + datastore_training_config = mdp.Config.from_yaml_file( + fp_training_datastore + ) + + inference_config = _create_inference_datastore_config( + training_config=datastore_training_config, + forecast_analysis_time=datetime.datetime.fromisoformat(analysis_time), + forecast_duration=forecast_duration, + overwrite_input_paths=overwrite_input_paths, + sampling_dim="analysis_time", + ) + + # save inference config to file + inference_config.to_yaml_file(inference_datastore_config_output_fp) + logger.info( + f"Saved inference datastore config to {inference_datastore_config_output_fp}" + ) + + ds = mdp.create_dataset(config=inference_config, ds_stats=ds_stats) + print(ds) + + +if __name__ == "__main__": + import ipdb + + with ipdb.launch_ipdb_on_exception(): + main() diff --git a/src/mlwm/tests/test_paths.py b/src/mlwm/tests/test_paths.py index e8eaa9e..91adf24 100644 --- a/src/mlwm/tests/test_paths.py +++ b/src/mlwm/tests/test_paths.py @@ -1,8 +1,7 @@ import datetime -import pytest - import mlwm.paths as mlwm_paths +import pytest @pytest.mark.parametrize( From 94742f831033dbe1dd22847bf205bbc53af1963a Mon Sep 17 00:00:00 2001 From: Leif Denby Date: Thu, 18 Sep 2025 10:58:41 +0200 Subject: [PATCH 03/19] able to load inference datastore and config in neural-lam --- .../surface-dummy-model_DINI/.gitignore | 1 + .../surface-dummy-model_DINI/pyproject.toml | 1 + .../src/run_inference.py | 191 +++++++++++++++--- pyproject.toml | 1 - 4 files changed, 165 insertions(+), 29 deletions(-) diff --git a/configurations/surface-dummy-model_DINI/.gitignore b/configurations/surface-dummy-model_DINI/.gitignore index 987012e..dac143b 100644 --- a/configurations/surface-dummy-model_DINI/.gitignore +++ b/configurations/surface-dummy-model_DINI/.gitignore @@ -2,3 +2,4 @@ *.zarr/ inference_artifact/ *.yaml +inference_workdir/ diff --git a/configurations/surface-dummy-model_DINI/pyproject.toml b/configurations/surface-dummy-model_DINI/pyproject.toml index 9d016cc..df970bf 100644 --- a/configurations/surface-dummy-model_DINI/pyproject.toml +++ b/configurations/surface-dummy-model_DINI/pyproject.toml @@ -15,6 +15,7 @@ dependencies = [ "zarr>=3.0", "mllam-data-prep", "ipython>=8.37.0", + "neural-lam==0.4.0", ] [dependency-groups] diff --git a/configurations/surface-dummy-model_DINI/src/run_inference.py b/configurations/surface-dummy-model_DINI/src/run_inference.py index e951128..d1e7433 100644 --- a/configurations/surface-dummy-model_DINI/src/run_inference.py +++ b/configurations/surface-dummy-model_DINI/src/run_inference.py @@ -1,14 +1,49 @@ import copy import datetime +from pathlib import Path from typing import Dict +import isodate import mllam_data_prep as mdp import mllam_data_prep.config as mdp_config +import torch import xarray as xr from loguru import logger +from neural_lam.config import NeuralLAMConfig, load_config_and_datastore +from neural_lam.weather_dataset import WeatherDataModule + +FP_TRAINING_CONFIG = "inference_artifact/configs/config.yaml" +FP_TRAINING_DATASTORE_STATS = ( + "inference_artifact/stats/danra.datastore.stats.zarr" +) +FP_TRAINING_DATASTORE_CONFIG = ( + "inference_artifact/configs/danra.datastore.yaml" +) + +# XXX: Parameters from training that aren't currently saved to the config, we +# have to hardcode these for now +NUM_PAST_FORCING_STEPS = 1 +NUM_FUTURE_FORCING_STEPS = 1 +# Inference system dependent parameters (larger batch size may require more +# memory, and more workers may require more CPU cores) +BATCH_SIZE = 4 +NUM_WORKERS = 2 + +S3_BUCKET_URL = "https://object-store.os-api.cci1.ecmwf.int/danra" +OVERWRITE_INPUT_PATHS = dict( + danra_surface=f"{S3_BUCKET_URL}/v0.6.0dev1/single_levels.zarr/", + danra_static=f"{S3_BUCKET_URL}/v0.5.0/single_levels.zarr/", +) +ANALYSIS_TIME = "2019-02-04T12:00" +FORECAST_DURATION = datetime.timedelta(hours=6) + +# the path below describes where to save the inference datastore config, +# inference zarr dataset and the inference config for neural-lam itself +FP_INFERENCE_WORKDIR = "inference_workdir" +FP_INFERENCE_DATASTORE_CONFIG = f"{FP_INFERENCE_WORKDIR}/danra.datastore.yaml" +FP_INFERENCE_CONFIG = f"{FP_INFERENCE_WORKDIR}/config.yaml" -@logger.catch(reraise=True) def _create_inference_datastore_config( training_config: mdp.Config, forecast_analysis_time: datetime.datetime, @@ -57,7 +92,7 @@ def _create_inference_datastore_config( time=["analysis_time", "elapsed_forecast_duration"], ) # there will be a single split called "test" - split_name = "test" + # split_name = "test" # which will have a single time slice, given by the analysis time argument # to the script sampling_coord_range = dict( @@ -82,9 +117,28 @@ def _create_inference_datastore_config( # setup the split (test) for the dataset with a coordinate range along the # sampling dimension (analysis_time) of length 1 + # inference_config.output.splitting = mdp_config.Splitting( + # dim=sampling_dim, + # splits={split_name: mdp_config.Split(**sampling_coord_range)}, + # ) + + # XXX: currently (as of 0.4.0) neural-lam requires that `train`, `val` and + # `test` splits are always present, even if they are not used. So we + # create empty `train` and `val` splits here inference_config.output.splitting = mdp_config.Splitting( - dim=sampling_dim, - splits={split_name: mdp_config.Split(**sampling_coord_range)}, + dim="time", + splits={ + "train": mdp_config.Split( + start=forecast_analysis_time, end=forecast_analysis_time + ), + "val": mdp_config.Split( + start=forecast_analysis_time, end=forecast_analysis_time + ), + "test": mdp_config.Split( + start=forecast_analysis_time, + end=forecast_analysis_time + forecast_duration, + ), + }, ) # ensure the output data is sampled along the sampling dimension @@ -133,46 +187,127 @@ def _create_inference_datastore_config( return inference_config -def main(): - fp_stats = "inference_artifact/stats/danra.datastore.stats.zarr" - fp_training_datastore = "inference_artifact/configs/danra.datastore.yaml" - - S3_BUCKET_URL = "https://object-store.os-api.cci1.ecmwf.int/danra" - overwrite_input_paths = dict( - danra_surface=f"{S3_BUCKET_URL}/v0.6.0dev1/single_levels.zarr/", - danra_static=f"{S3_BUCKET_URL}/v0.5.0/single_levels.zarr/", - ) - analysis_time = "2019-02-04T12:00" - forecast_duration = datetime.timedelta(hours=6) +def _prepare_inference_dataset_zarr() -> str: + """ + Prepare the inference dataset. - inference_datastore_config_output_fp = "danra.inference.datastore.yaml" + Returns + ------- + str + The path to the inference datastore config file. The inference dataset + is saved as a zarr store in the same directory as the config file, with + the same name but with a .zarr extension instead of .yaml. + """ + if Path(FP_INFERENCE_DATASTORE_CONFIG).exists(): + logger.info( + f"Found existing inference datastore config at " + f"{FP_INFERENCE_DATASTORE_CONFIG}, skipping dataset creation" + ) + return FP_INFERENCE_DATASTORE_CONFIG - ds_stats = xr.open_dataset(fp_stats) + ds_stats = xr.open_dataset(FP_TRAINING_DATASTORE_STATS) logger.debug(f"Opened stats dataset: {ds_stats}") logger.debug( - f"Loading training datastore config from {fp_training_datastore}" + f"Loading training datastore config from {FP_TRAINING_DATASTORE_CONFIG}" ) datastore_training_config = mdp.Config.from_yaml_file( - fp_training_datastore + FP_TRAINING_DATASTORE_CONFIG ) inference_config = _create_inference_datastore_config( training_config=datastore_training_config, - forecast_analysis_time=datetime.datetime.fromisoformat(analysis_time), - forecast_duration=forecast_duration, - overwrite_input_paths=overwrite_input_paths, + forecast_analysis_time=datetime.datetime.fromisoformat(ANALYSIS_TIME), + forecast_duration=FORECAST_DURATION, + overwrite_input_paths=OVERWRITE_INPUT_PATHS, sampling_dim="analysis_time", ) - # save inference config to file - inference_config.to_yaml_file(inference_datastore_config_output_fp) - logger.info( - f"Saved inference datastore config to {inference_datastore_config_output_fp}" + ds = mdp.create_dataset(config=inference_config, ds_stats=ds_stats) + + # neural-lam's convention is to have the same name for the zarr store + # as the config file, but with .zarr extension + fp_dataset = FP_INFERENCE_DATASTORE_CONFIG.replace(".yaml", ".zarr") + + Path(FP_INFERENCE_DATASTORE_CONFIG).parent.mkdir( + parents=True, exist_ok=True ) + inference_config.to_yaml_file(FP_INFERENCE_DATASTORE_CONFIG) + ds.to_zarr(fp_dataset) + logger.info(f"Saved inference dataset to {fp_dataset}") - ds = mdp.create_dataset(config=inference_config, ds_stats=ds_stats) - print(ds) + return FP_INFERENCE_DATASTORE_CONFIG + + +def _create_inference_config(fp_inference_datastore_config: str) -> str: + training_config = NeuralLAMConfig.from_yaml_file(FP_TRAINING_CONFIG) + inference_config = copy.deepcopy(training_config) + + # overwrite the path to the datastore config, to point to the + # inference datastore config + inference_config.datastore.config_path = Path( + fp_inference_datastore_config + ).relative_to(Path(FP_INFERENCE_CONFIG).parent) + + # XXX: There is a bug in neural-lam here that means that the datastore kind + # doesn't correctly get serialised to a string in the config file when + # saved to yaml + inference_config.datastore.kind = "mdp" + + inference_config.to_yaml_file(FP_INFERENCE_CONFIG) + logger.info(f"Saved inference config to {FP_INFERENCE_CONFIG}") + + return FP_INFERENCE_CONFIG + + +@logger.catch(reraise=True) +def main(): + fp_inference_datastore_config = _prepare_inference_dataset_zarr() + fp_inference_config = _create_inference_config( + fp_inference_datastore_config=fp_inference_datastore_config + ) + + # Load neural-lam configuration and datastore to use + config, datastore = load_config_and_datastore( + config_path=fp_inference_config + ) + + # XXX: hardcoded timestep from DANRA right now, this should be inferred + # from the dataset itself probably. neural-lam wants to know the number of + # steps for the autoregressive prediction, not the total duration. + ar_steps_eval = FORECAST_DURATION / isodate.parse_duration("PT3H") + + # Create datamodule + data_module = WeatherDataModule( + datastore=datastore, + ar_steps_train=0, + ar_steps_eval=ar_steps_eval, + standardize=True, + num_past_forcing_steps=NUM_PAST_FORCING_STEPS, + num_future_forcing_steps=NUM_FUTURE_FORCING_STEPS, + batch_size=BATCH_SIZE, + num_workers=NUM_WORKERS, + eval_split="test", + ) + + # Instantiate model + trainer + if torch.cuda.is_available(): + device_name = "cuda" + torch.set_float32_matmul_precision( + "high" + ) # Allows using Tensor Cores on A100s + else: + device_name = "cpu" + + devices = "auto" + + # + # ModelClass = nl.models.HiLAM + # model = ModelClass(args, config=config, datastore=datastore) + + assert data_module.eval_dataloader() is not None + assert device_name is not None + assert devices is not None if __name__ == "__main__": diff --git a/pyproject.toml b/pyproject.toml index 7d0ddf4..ea9f9b5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,6 @@ readme = "README.md" requires-python = ">=3.10" dependencies = [ "parse>=1.20.2", - "dask>=2025.4.1", "dotenv>=0.9.9", "ipdb>=0.13.13", "mllam-data-prep>=0.6.0", From 081e60581c3bc2a4fd34b0485ba164bf90afe80d Mon Sep 17 00:00:00 2001 From: Leif Denby Date: Thu, 18 Sep 2025 14:08:25 +0200 Subject: [PATCH 04/19] wip on inference run --- .../surface-dummy-model_DINI/pyproject.toml | 3 +- .../src/run_inference.py | 41 +++- src/mlwm/create_forecast_inference_dataset.py | 227 ------------------ 3 files changed, 39 insertions(+), 232 deletions(-) delete mode 100644 src/mlwm/create_forecast_inference_dataset.py diff --git a/configurations/surface-dummy-model_DINI/pyproject.toml b/configurations/surface-dummy-model_DINI/pyproject.toml index df970bf..74be5ce 100644 --- a/configurations/surface-dummy-model_DINI/pyproject.toml +++ b/configurations/surface-dummy-model_DINI/pyproject.toml @@ -15,7 +15,7 @@ dependencies = [ "zarr>=3.0", "mllam-data-prep", "ipython>=8.37.0", - "neural-lam==0.4.0", + "neural-lam", ] [dependency-groups] @@ -29,6 +29,7 @@ profile = "black" [tool.uv.sources] mllam-data-prep = { git = "https://github.com/leifdenby/mllam-data-prep", rev = "feat/inference-cli-args" } +neural-lam = { git = "https://github.com/leifdenby/neural-lam", rev = "feat/mdp-datastore-support-forecast-data" } [build-system] requires = ["setuptools>=61", "setuptools_scm"] build-backend = "setuptools.build_meta" diff --git a/configurations/surface-dummy-model_DINI/src/run_inference.py b/configurations/surface-dummy-model_DINI/src/run_inference.py index d1e7433..5fe62b4 100644 --- a/configurations/surface-dummy-model_DINI/src/run_inference.py +++ b/configurations/surface-dummy-model_DINI/src/run_inference.py @@ -6,9 +6,11 @@ import isodate import mllam_data_prep as mdp import mllam_data_prep.config as mdp_config +import pytorch_lightning as pl import torch import xarray as xr from loguru import logger +from neural_lam import models as nl_models from neural_lam.config import NeuralLAMConfig, load_config_and_datastore from neural_lam.weather_dataset import WeatherDataModule @@ -24,6 +26,7 @@ # have to hardcode these for now NUM_PAST_FORCING_STEPS = 1 NUM_FUTURE_FORCING_STEPS = 1 +MODEL_CLASS = nl_models.GraphLAM # Inference system dependent parameters (larger batch size may require more # memory, and more workers may require more CPU cores) BATCH_SIZE = 4 @@ -287,7 +290,6 @@ def main(): num_future_forcing_steps=NUM_FUTURE_FORCING_STEPS, batch_size=BATCH_SIZE, num_workers=NUM_WORKERS, - eval_split="test", ) # Instantiate model + trainer @@ -301,14 +303,45 @@ def main(): devices = "auto" - # - # ModelClass = nl.models.HiLAM - # model = ModelClass(args, config=config, datastore=datastore) + class ModelArgs: + output_std = None + # XXX: we shouldn't have to set a loss function when we're only doing + # inference, but neural-lam currently requires it + loss = "mse" + restore_opt = False + n_example_pred = 1 + lr = None + + graph = "inference-graph" + hidden_dim = 4 + hidden_layers = 1 + processor_layers = 2 + mesh_aggr = "sum" + val_steps_to_log = [1, 3] + metrics_watch = [] + num_past_forcing_steps = NUM_PAST_FORCING_STEPS + num_future_forcing_steps = NUM_FUTURE_FORCING_STEPS + + model_args = ModelArgs() + model = MODEL_CLASS(model_args, config=config, datastore=datastore) assert data_module.eval_dataloader() is not None assert device_name is not None assert devices is not None + trainer = pl.Trainer( + max_epochs=1, + deterministic=True, + accelerator=device_name, + devices=devices, + log_every_n_steps=1, + # use `detect_anomaly` to ensure that we don't have NaNs popping up + # during inference + detect_anomaly=True, + ) + + trainer.test(model=model, datamodule=data_module) + if __name__ == "__main__": import ipdb diff --git a/src/mlwm/create_forecast_inference_dataset.py b/src/mlwm/create_forecast_inference_dataset.py deleted file mode 100644 index c7b10db..0000000 --- a/src/mlwm/create_forecast_inference_dataset.py +++ /dev/null @@ -1,227 +0,0 @@ -import datetime -import tempfile -from pathlib import Path - -import isodate -import rich -import xarray as xr -from loguru import logger - -from . import config as mdp_config -from .create_dataset import create_dataset - - -def parse_key_value_arg(arg: str) -> tuple[str, str]: - """ - Parse a single key=value argument into a dictionary. - - This function is intended for use with argparse's `type=` argument when parsing - command-line arguments like: --overwrite-source-paths key1=value1 key2=value2 - - Args: - arg (str): A string in the format key=value. - - Returns: - Dict[str, str]: A dictionary with one key-value pair parsed from the - input string. - - Raises: - argparse.ArgumentTypeError: If the argument is not in key=value format. - """ - import argparse - - if "=" not in arg: - raise argparse.ArgumentTypeError( - f"Invalid format: '{arg}'. Expected key=value." - ) - - key, value = arg.split("=", 1) - return (key, value) - - -def create_forecast_inference_dataset( - fp_config: str, - analysis_time: datetime.datetime, - overwrite_input_paths: dict, - use_stats_from_path: str, -): - """ - Create forecasting prediction dataset derived from a config file used during - training. In creating the inference dataset, it is assumed that the `time` - dimension of all input datasets used should be replaced by the - `analysis_time` and `elapsed_forecast_duration` dimensions. - """ - - # the new sampling dimension is `analysis_time` - old_sampling_dim = "time" - sampling_dim = "analysis_time" - # instead of only having `time` as dimension, the input forecast datasets - # have two dimensions that describe the time value [analysis_time, - # elapsed_forecast_duration] - dim_replacements = dict( - time=["analysis_time", "elapsed_forecast_duration"], - ) - # there will be a single split called "test" - split_name = "test" - # which will have a single time slice, given by the analysis time argument - # to the script - sampling_coord_range = dict( - start=analysis_time, - end=analysis_time, - ) - - # load and modify the original config file - config = mdp_config.Config.from_yaml_file(file=fp_config) - - if overwrite_input_paths: - for key, value in overwrite_input_paths.items(): - if key not in config.inputs: - raise ValueError( - f"Key {key} not found in config inputs. " - f"Available keys are: {list(config.inputs.keys())}" - ) - logger.info( - f"Overwriting input path for {key} with {value} previously " - f"{config.inputs[key].path}" - ) - config.inputs[key].path = value - - # setup the split (test) for the dataset with a coordinate range along the - # sampling dimension (analysis_time) of length 1 - config.output.splitting = mdp_config.Splitting( - dim=sampling_dim, - splits={split_name: mdp_config.Split(**sampling_coord_range)}, - ) - - # ensure the output data is sampled along the sampling dimension - # (analysis_time) too - config.output.coord_ranges = { - sampling_dim: analysis_time # mdp_config.Range(**sampling_coord_range) - } - - config.output.chunking = {sampling_dim: 1} - - # replace old sampling_dimension (time) dimension in outputs with - # [`analysis_time`, `elapsed_forecast_time`] - for variable, dims in config.output.variables.items(): - if old_sampling_dim in dims: - orig_sampling_dim_index = dims.index(old_sampling_dim) - dims.remove(old_sampling_dim) - for dim in dim_replacements[old_sampling_dim][::-1]: - dims.insert(orig_sampling_dim_index, dim) - config.output.variables[variable] = dims - logger.info( - f"Replaced {old_sampling_dim} dimension with " - f"{dim_replacements[old_sampling_dim]} for {variable}" - ) - - # these dimensions should also be "renamed" from the input datasets - for input_name in config.inputs.keys(): - if "time" in config.inputs[input_name].dim_mapping: - dims = config.inputs[input_name].dims - orig_sampling_dim_index = dims.index(old_sampling_dim) - dims.remove(old_sampling_dim) - for dim in dim_replacements[old_sampling_dim][::-1]: - dims.insert(orig_sampling_dim_index, dim) - config.inputs[input_name].dims = dims - - del config.inputs[input_name].dim_mapping[old_sampling_dim] - - # add new "rename" dim-mappins for `analysis_time` and - # `elapsed_forecast_duration` - for dim in dim_replacements[old_sampling_dim]: - config.inputs[input_name].dim_mapping[ - dim - ] = mdp_config.DimMapping(method="rename", dim=dim) - - # save config to temporary filepath - tmpfile = tempfile.NamedTemporaryFile(delete=False, suffix=".yaml") - fp_config_temp = Path(tmpfile.name) - config.to_yaml_file(fp_config_temp) - logger.info(f"Temporary config file created at {fp_config_temp}") - - rich.print(config) - - if use_stats_from_path is not None: - ds_stats = xr.open_zarr(use_stats_from_path) - - ds = create_dataset(config=config, ds_stats=ds_stats) - - return ds, config - - -def cli(argv=None): - import argparse - - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter - ) - parser.add_argument("config", help="Path to the config file", type=Path) - parser.add_argument( - "-o", - "--output", - help="Path to the output zarr file", - type=Path, - default=None, - ) - - parser.add_argument( - "--overwrite-input-paths", - nargs="*", - type=parse_key_value_arg, - help=( - "List of key=value pairs used to overwrite input paths of named " - "inputs in the config file. For example: --overwrite-input-paths " - "danra_surface=s3://mybucket/2025-05-01T1200Z/danra_surface.zarr " - "danra_height_levels=s3://mybucket/2025-05-01T1200Z/" - "danra_height_levels.zarr" - ), - ) - parser.add_argument( - "--analysis_time", - required=True, - help="Analysis time to use for the dataset. This is used to select the " - "correct time slice from the input data.", - type=isodate.parse_datetime, - ) - parser.add_argument( - "--use-stats-from-path", - help="Path to zarr dataset with stats to use in the new dataset. " - "Using the option will cause mllam-data-prep to skip calculating " - "stats and instead use the stats from the provided path.", - type=Path, - default=None, - ) - - args = parser.parse_args(argv) - - analysis_time = args.analysis_time - use_stats_from_path = args.use_stats_from_path - fp_config = Path(args.config) - - # Convert the list of tuples to a dictionary - overwrite_input_paths = dict(args.overwrite_input_paths) - - ds, config = create_forecast_inference_dataset( - analysis_time=analysis_time, - fp_config=fp_config, - overwrite_input_paths=overwrite_input_paths, - use_stats_from_path=use_stats_from_path, - ) - - dataset_name = ( - f"{Path(fp_config).name}." - f"{isodate.datetime_isoformat(analysis_time).replace(':','')}" - ) - fp_config_inference = f"{dataset_name}.yaml" - fp_dataset_inference = f"{dataset_name}.zarr" - - logger.info(f"Writing inference dataset to {fp_dataset_inference}") - ds.to_zarr(fp_dataset_inference, mode="w", consolidated=True) - - logger.info(f"Writing inference config to {fp_config_inference}") - config.to_yaml_file(fp_config_inference) - - -if __name__ == "__main__": - cli() From 316ccb596d29aa1bf446e7afea1c57b2e4af9998 Mon Sep 17 00:00:00 2001 From: Leif Denby Date: Thu, 25 Sep 2025 16:52:41 +0200 Subject: [PATCH 05/19] first working inference entry-point! --- .../surface-dummy-model_DINI/README.md | 71 ++++++++++++++++ .../surface-dummy-model_DINI/entry.sh | 69 +++++++++------ .../surface-dummy-model_DINI/pyproject.toml | 10 +-- ...ference.py => create_inference_dataset.py} | 85 +------------------ pyproject.toml | 7 +- 5 files changed, 128 insertions(+), 114 deletions(-) rename configurations/surface-dummy-model_DINI/src/{run_inference.py => create_inference_dataset.py} (80%) diff --git a/configurations/surface-dummy-model_DINI/README.md b/configurations/surface-dummy-model_DINI/README.md index e69de29..262a625 100644 --- a/configurations/surface-dummy-model_DINI/README.md +++ b/configurations/surface-dummy-model_DINI/README.md @@ -0,0 +1,71 @@ +# surface-dummy-model_DANRA + +The model configuration in this directory is a dummy model that was trained on +surface variables from DANRA, only 10 days of data and only trained 10 +epochs. It is intended only as a demonstration of the inference pipeline and is +expected to give very poor results. + +## Upstream package change requirements + +Relative to the `main` branch on both github.com/mllam/mllam-data-prep and +github.com/mllam/neural-lam and number of pieces of functionality are currently +required to run this configuration: + +**mllam-data-prep**: + +using branch `feat/inference-cli-args` on +https://github.com/leifdenby/mllam-data-prep@feat/inference-cli-args, which adds: + +- functionality to invert datasets created by `mllam-data-prep` back to the + structure of the input datasets that we were used. In the current + configuration that is used to restructure the forecast zarr dataset that + `neural-lam` outputs during inference back to the structure of the input + forecast dataset. + + - also in seperate branch and PR: https://github.com/leifdenby/mllam-data-prep/tree/feat/inverse-ops + +- use of cf-compliant encoding of `xarray/pandas` `MultiIndex` coordinates to + store stacked coordinates. This is required since we `MultiIndex` coordinates + can't natively be stored in zarr/netcdf files, but fortunately `cf_xarray` + have implemented the cf-compliant way of handling this (see + https://cf-xarray.readthedocs.io/en/latest/coding.html) + + - needs its own branch and PR + +- support for supplying statistics from the training dataset during creation of + the inference dataset, so that the inference dataset can be normalised in the + same way as the training dataset. + + - needs its own branch and PR + +- support for selecting only a single value from a variable/coordinate in the + configuration. This is used to select only a single analysis time during + creation of the inference dataset. + + - needs its own branch and PR + + +**neural-lam**: + +using branch `dev/first-inference-image` on +https://github.com/leifdenby/neural-lam/tree/dev/first-inference-image, which +adds: + +- support for decoding cf-compliant `MultiIndex` encoded coordinates when reading + datasets produced with mllam-data-prep. + + - this needs its own branch and PR, and needs to be implemented so datasets + made with previous versions of `mllam-data-prep` are still usable in `neural-lam` + +- support for writing output from inference (i.e. `--eval` mode) to a zarr + dataset. Needs to be merged after the multiindex decoding above. + + - also in seperate branch and PR: https://github.com/leifdenby/neural-lam/tree/feat/write-to-zarr + +- support for using forecast data in in mllam-data-prep datastore (`MDPDatastore`) + + - needs its own branch and PR + +- make logging of validation steps optional in the training CLI (i.e. `--eval` mode) + + - needs its own branch and PR diff --git a/configurations/surface-dummy-model_DINI/entry.sh b/configurations/surface-dummy-model_DINI/entry.sh index 376ba89..e7732d4 100755 --- a/configurations/surface-dummy-model_DINI/entry.sh +++ b/configurations/surface-dummy-model_DINI/entry.sh @@ -5,42 +5,56 @@ # container image build that the inference artifact was unpacked to # inference_artifact/ - -INFERENCE_ARTIFACT_PATH="./inference_artifact" -# XXX: these mount points could come from config.yaml for the model run configuration -INPUT_DATASETS_ROOT_PATH="/volume/inputs" -OUTPUT_DATASETS_ROOT_PATH="/volume/outputs" +# make this script fail on any error +set -e # forecast out to 18 hours, which means 6 steps of 3 hours each (the model was # trained on 3-hourly analysis data) NUM_EVAL_STEPS=6 +# model specific parameters, ideally these would come from some config +NUM_HIDDEN_DIMS=2 +GRAPH_NAME="multiscale" +HIEARCHICAL_GRAPH=false + +if [ "$HIEARCHICAL_GRAPH" = true ] ; then + CREATE_GRAPH_ARG="--hierarchical" +else + CREATE_GRAPH_ARG="" +fi + +INFERENCE_ARTIFACT_PATH="./inference_artifact" +INFERENCE_WORK_PATH="./inference_workdir" + +# XXX: these mount points could come from config.yaml for the model run configuration +INPUT_DATASETS_ROOT_PATH="${INFERENCE_WORK_PATH}/inputs" +OUTPUT_DATASETS_ROOT_PATH="${INFERENCE_WORK_PATH}/outputs" + +mkdir -p ${OUTPUT_DATASETS_ROOT_PATH} + ## 1. Create inference dataset # This uses a cli stored within mlwm to called mllam-data-prep to create the # inference dataset. The inference dataset is created by modifying the -# configuration used during training to a) change the paths to the input datasets, -# b) include the statistics from the training dataset and c) set the dimensions -# in the configuration to have `analysis_time` and `elapsed_forecast_duration` -# instead of just `time`. -uv run python -m mlwm.create_inference_dataset \ - --config_path ${INFERENCE_ARTIFACT_PATH}/config.yaml \ - --override_input_paths \ - danra_surface=${INPUT_DATASETS_ROOT_PATH}/single_levels.zarr \ - danra_surface_forcing=${INPUT_DATASETS_ROOT_PATH}/single_levels.zarr \ - danra_static=${INPUT_DATASETS_ROOT_PATH}/single_levels.zarr \ - --use_stats_from_path ${INFERENCE_ARTIFACT_PATH}/danra.datastore.stats.zarr \ - --output_root_path inference/ +# configuration used during training to +# a) change the paths to the input datasets, +# b) include the statistics from the training dataset and +# c) set the dimensions in the configuration to have `analysis_time` and +# `elapsed_forecast_duration` instead of just `time`. +uv run python src/create_inference_dataset.py ## 2. Create graph -uv run python -m neural_lam.create_graph --config_path inference/config.yaml +# TODO: could cache this, although that isn't implemented at the moment +# uv run python -m neural_lam.create_graph --config_path ${INFERENCE_WORK_PATH}/config.yaml \ +# --name ${GRAPH_NAME} ${CREATE_GRAPH_ARG} ## 3. Run inference -uv run python -m neural_lam.train_model --config_path inference/config.yaml \ - --eval \ - --graph multiscale \ - --hidden_dim 2 \ +uv run python -m neural_lam.train_model --config_path ${INFERENCE_WORK_PATH}/config.yaml \ + --eval test\ + --graph ${GRAPH_NAME} \ + --hidden_dim ${NUM_HIDDEN_DIMS} \ --ar_steps_eval ${NUM_EVAL_STEPS} \ - --load ${INFERENCE_ARTIFACT_PATH}/checkpoint.ckpt \ + --val_steps_to_log \ + --load ${INFERENCE_ARTIFACT_PATH}/checkpoint.pkl \ --save_eval_to_zarr_path ${OUTPUT_DATASETS_ROOT_PATH}/inference_output.zarr ## 4. Transform inference output back to original grid and variables @@ -49,5 +63,10 @@ uv run python -m neural_lam.train_model --config_path inference/config.yaml \ # means that we will have `danra_surface.zarr` in this case. We rename name # that manually here but maybe mllam-data-prep should be able to merge inputs # originating from the same zarr dataset path? -uv run python -m mllam_data_prep.recreate_inputs ${OUTPUT_DATASETS_ROOT_PATH}/inference_output.zarr -rename ${OUTPUT_DATASETS_ROOT_PATH}/danra_surface.zarr ${OUTPUT_DATASETS_ROOT_PATH}/single_levels.zarr +uv run python -m mllam_data_prep.recreate_inputs \ + --config-path ${INFERENCE_WORK_PATH}/danra.datastore.yaml \ + --output-path-format "${OUTPUT_DATASETS_ROOT_PATH}/{input_name}.zarr" \ + ${OUTPUT_DATASETS_ROOT_PATH}/inference_output.zarr + +echo "Renaming ${OUTPUT_DATASETS_ROOT_PATH}/danra_surface.zarr to ${OUTPUT_DATASETS_ROOT_PATH}/single_levels.zarr" +mv ${OUTPUT_DATASETS_ROOT_PATH}/danra_surface.zarr ${OUTPUT_DATASETS_ROOT_PATH}/single_levels.zarr diff --git a/configurations/surface-dummy-model_DINI/pyproject.toml b/configurations/surface-dummy-model_DINI/pyproject.toml index 020e404..83c9ec6 100644 --- a/configurations/surface-dummy-model_DINI/pyproject.toml +++ b/configurations/surface-dummy-model_DINI/pyproject.toml @@ -13,8 +13,8 @@ dependencies = [ "tqdm>=4.67.1", "universal-pathlib>=0.2.6", "zarr>=3.0", - "mllam-data-prep", "ipython>=8.37.0", + "mllam-data-prep", "neural-lam", ] @@ -26,10 +26,6 @@ dev = [ [tool.isort] profile = "black" - -[tool.uv.sources] -mllam-data-prep = { git = "https://github.com/leifdenby/mllam-data-prep", rev = "feat/inference-cli-args" } -neural-lam = { git = "https://github.com/leifdenby/neural-lam", rev = "feat/mdp-datastore-support-forecast-data" } [build-system] requires = ["setuptools>=61", "setuptools_scm"] build-backend = "setuptools.build_meta" @@ -40,3 +36,7 @@ include-package-data = true [tool.setuptools.packages.find] where = ["src"] + +[tool.uv.sources] +mllam-data-prep = { git = "https://github.com/leifdenby/mllam-data-prep", rev = "feat/inference-cli-args" } +neural-lam = { git = "https://github.com/leifdenby/neural-lam", rev = "dev/first-inference-image" } diff --git a/configurations/surface-dummy-model_DINI/src/run_inference.py b/configurations/surface-dummy-model_DINI/src/create_inference_dataset.py similarity index 80% rename from configurations/surface-dummy-model_DINI/src/run_inference.py rename to configurations/surface-dummy-model_DINI/src/create_inference_dataset.py index 5fe62b4..ec1e282 100644 --- a/configurations/surface-dummy-model_DINI/src/run_inference.py +++ b/configurations/surface-dummy-model_DINI/src/create_inference_dataset.py @@ -3,16 +3,12 @@ from pathlib import Path from typing import Dict -import isodate import mllam_data_prep as mdp import mllam_data_prep.config as mdp_config -import pytorch_lightning as pl -import torch import xarray as xr from loguru import logger from neural_lam import models as nl_models -from neural_lam.config import NeuralLAMConfig, load_config_and_datastore -from neural_lam.weather_dataset import WeatherDataModule +from neural_lam.config import NeuralLAMConfig FP_TRAINING_CONFIG = "inference_artifact/configs/config.yaml" FP_TRAINING_DATASTORE_STATS = ( @@ -266,85 +262,10 @@ def _create_inference_config(fp_inference_datastore_config: str) -> str: @logger.catch(reraise=True) def main(): fp_inference_datastore_config = _prepare_inference_dataset_zarr() - fp_inference_config = _create_inference_config( + _create_inference_config( fp_inference_datastore_config=fp_inference_datastore_config ) - # Load neural-lam configuration and datastore to use - config, datastore = load_config_and_datastore( - config_path=fp_inference_config - ) - - # XXX: hardcoded timestep from DANRA right now, this should be inferred - # from the dataset itself probably. neural-lam wants to know the number of - # steps for the autoregressive prediction, not the total duration. - ar_steps_eval = FORECAST_DURATION / isodate.parse_duration("PT3H") - - # Create datamodule - data_module = WeatherDataModule( - datastore=datastore, - ar_steps_train=0, - ar_steps_eval=ar_steps_eval, - standardize=True, - num_past_forcing_steps=NUM_PAST_FORCING_STEPS, - num_future_forcing_steps=NUM_FUTURE_FORCING_STEPS, - batch_size=BATCH_SIZE, - num_workers=NUM_WORKERS, - ) - - # Instantiate model + trainer - if torch.cuda.is_available(): - device_name = "cuda" - torch.set_float32_matmul_precision( - "high" - ) # Allows using Tensor Cores on A100s - else: - device_name = "cpu" - - devices = "auto" - - class ModelArgs: - output_std = None - # XXX: we shouldn't have to set a loss function when we're only doing - # inference, but neural-lam currently requires it - loss = "mse" - restore_opt = False - n_example_pred = 1 - lr = None - - graph = "inference-graph" - hidden_dim = 4 - hidden_layers = 1 - processor_layers = 2 - mesh_aggr = "sum" - val_steps_to_log = [1, 3] - metrics_watch = [] - num_past_forcing_steps = NUM_PAST_FORCING_STEPS - num_future_forcing_steps = NUM_FUTURE_FORCING_STEPS - - model_args = ModelArgs() - model = MODEL_CLASS(model_args, config=config, datastore=datastore) - - assert data_module.eval_dataloader() is not None - assert device_name is not None - assert devices is not None - - trainer = pl.Trainer( - max_epochs=1, - deterministic=True, - accelerator=device_name, - devices=devices, - log_every_n_steps=1, - # use `detect_anomaly` to ensure that we don't have NaNs popping up - # during inference - detect_anomaly=True, - ) - - trainer.test(model=model, datamodule=data_module) - if __name__ == "__main__": - import ipdb - - with ipdb.launch_ipdb_on_exception(): - main() + main() diff --git a/pyproject.toml b/pyproject.toml index 7d8b728..d3d52af 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,7 +3,7 @@ name = "mlwm-deployment" dynamic = ["version"] description = "Add your description here" readme = "README.md" -requires-python = ">=3.10" +requires-python = ">=3.11" dependencies = [ "parse>=1.20.2", "dask>=2025.4.1", @@ -15,11 +15,14 @@ dependencies = [ "s3fs>=2025.3.2", "tqdm>=4.67.1", "universal-pathlib>=0.2.6", - "zarr", + "zarr>=3.0.0", + "xarray>=2025.5.0", ] [dependency-groups] dev = [ + "ipykernel>=6.30.1", + "jinja2>=3.1.6", "pre-commit>=4.2.0", "pytest>=8.3.5", ] From 91b7104850e2a8580675fa7defb676da1a6ed2a8 Mon Sep 17 00:00:00 2001 From: Leif Denby Date: Thu, 25 Sep 2025 16:54:24 +0200 Subject: [PATCH 06/19] cleanup --- .../surface-dummy-model_DINI/datastore.yaml | 165 ------------------ .../surface-dummy-model_DINI/meta.yaml | 0 2 files changed, 165 deletions(-) delete mode 100644 configurations/surface-dummy-model_DINI/datastore.yaml delete mode 100644 configurations/surface-dummy-model_DINI/meta.yaml diff --git a/configurations/surface-dummy-model_DINI/datastore.yaml b/configurations/surface-dummy-model_DINI/datastore.yaml deleted file mode 100644 index fcf227f..0000000 --- a/configurations/surface-dummy-model_DINI/datastore.yaml +++ /dev/null @@ -1,165 +0,0 @@ -schema_version: v0.5.0 -dataset_version: v0.1.0 - -output: - variables: - static: [grid_index, static_feature] - state: [time, grid_index, state_feature] - forcing: [time, grid_index, forcing_feature] - coord_ranges: - time: - start: - end: - step: PT3H - chunking: - time: 1 - state_feature: 20 - splitting: - dim: time - splits: - train: - start: 2000-01-01T00:00 - end: 2018-10-29T00:00 - compute_statistics: - ops: [mean, std, diff_mean, diff_std] - dims: [grid_index, time] - val: - start: 2018-11-05T00:00 - end: 2019-10-22T00:00 - test: - start: 2019-10-29T00:00 - end: 2020-10-29T00:00 - -inputs: - danra_sl_state: - path: /harmonie_cy40/danra/w12p05_s45p65_e24p52_n64p40/dx2p5km_dy2p5km//single_levels.zarr/ - dims: [time, x, y] - variables: - - pres_seasurface - - t2m - - u10m - - v10m - - pres0m - - lwavr0m - - swavr0m - dim_mapping: - time: - method: rename - dim: time - grid_index: - method: stack - dims: [x, y] - state_feature: - method: stack_variables_by_var_name - name_format: "{var_name}" - target_output_variable: state - - danra_pl_state: - path: /harmonie_cy40/danra/w12p05_s45p65_e24p52_n64p40/dx2p5km_dy2p5km//pressure_levels.zarr/ - dims: [time, x, y, pressure] - variables: - z: - pressure: - values: [100, 200, 400, 600, 700, 850, 925, 1000,] - units: hPa - t: - pressure: - values: [100, 200, 400, 600, 700, 850, 925, 1000,] - units: hPa - r: - pressure: - values: [100, 200, 400, 600, 700, 850, 925, 1000,] - units: hPa - u: - pressure: - values: [100, 200, 400, 600, 700, 850, 925, 1000,] - units: hPa - v: - pressure: - values: [100, 200, 400, 600, 700, 850, 925, 1000,] - units: hPa - tw: - pressure: - values: [100, 200, 400, 600, 700, 850, 925, 1000,] - units: hPa - dim_mapping: - time: - method: rename - dim: time - state_feature: - method: stack_variables_by_var_name - dims: [pressure] - name_format: "{var_name}{pressure}" - grid_index: - method: stack - dims: [x, y] - target_output_variable: state - - danra_static: - path: /harmonie_cy40/danra/w12p05_s45p65_e24p52_n64p40/dx2p5km_dy2p5km//single_levels.zarr/ - dims: [x, y] - variables: - - lsm - - orography - dim_mapping: - grid_index: - method: stack - dims: [x, y] - static_feature: - method: stack_variables_by_var_name - name_format: "{var_name}" - target_output_variable: static - - danra_forcing: - path: /harmonie_cy40/danra/w12p05_s45p65_e24p52_n64p40/dx2p5km_dy2p5km//single_levels.zarr/ - dims: [time, x, y] - derived_variables: - # derive variables to be used as forcings - toa_radiation: - kwargs: - time: ds_input.time - lat: ds_input.lat - lon: ds_input.lon - function: mllam_data_prep.ops.derive_variable.physical_field.calculate_toa_radiation - hour_of_day_sin: - kwargs: - time: ds_input.time - component: sin - function: mllam_data_prep.ops.derive_variable.time_components.calculate_hour_of_day - hour_of_day_cos: - kwargs: - time: ds_input.time - component: cos - function: mllam_data_prep.ops.derive_variable.time_components.calculate_hour_of_day - day_of_year_sin: - kwargs: - time: ds_input.time - component: sin - function: mllam_data_prep.ops.derive_variable.time_components.calculate_day_of_year - day_of_year_cos: - kwargs: - time: ds_input.time - component: cos - function: mllam_data_prep.ops.derive_variable.time_components.calculate_day_of_year - dim_mapping: - time: - method: rename - dim: time - grid_index: - method: stack - dims: [x, y] - forcing_feature: - method: stack_variables_by_var_name - name_format: "{var_name}" - target_output_variable: forcing - -extra: - projection: - class_name: LambertConformal - kwargs: - central_longitude: 25.0 - central_latitude: 56.7 - standard_parallels: [56.7, 56.7] - globe: - semimajor_axis: 6367470.0 - semiminor_axis: 6367470.0 diff --git a/configurations/surface-dummy-model_DINI/meta.yaml b/configurations/surface-dummy-model_DINI/meta.yaml deleted file mode 100644 index e69de29..0000000 From d0b6accf1ac0918ffcafd516fcdf310284d7c1b0 Mon Sep 17 00:00:00 2001 From: Leif Denby Date: Thu, 25 Sep 2025 17:02:44 +0200 Subject: [PATCH 07/19] more cleanup --- .../src/create_inference_dataset.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/configurations/surface-dummy-model_DINI/src/create_inference_dataset.py b/configurations/surface-dummy-model_DINI/src/create_inference_dataset.py index ec1e282..9497d97 100644 --- a/configurations/surface-dummy-model_DINI/src/create_inference_dataset.py +++ b/configurations/surface-dummy-model_DINI/src/create_inference_dataset.py @@ -7,7 +7,6 @@ import mllam_data_prep.config as mdp_config import xarray as xr from loguru import logger -from neural_lam import models as nl_models from neural_lam.config import NeuralLAMConfig FP_TRAINING_CONFIG = "inference_artifact/configs/config.yaml" @@ -18,16 +17,6 @@ "inference_artifact/configs/danra.datastore.yaml" ) -# XXX: Parameters from training that aren't currently saved to the config, we -# have to hardcode these for now -NUM_PAST_FORCING_STEPS = 1 -NUM_FUTURE_FORCING_STEPS = 1 -MODEL_CLASS = nl_models.GraphLAM -# Inference system dependent parameters (larger batch size may require more -# memory, and more workers may require more CPU cores) -BATCH_SIZE = 4 -NUM_WORKERS = 2 - S3_BUCKET_URL = "https://object-store.os-api.cci1.ecmwf.int/danra" OVERWRITE_INPUT_PATHS = dict( danra_surface=f"{S3_BUCKET_URL}/v0.6.0dev1/single_levels.zarr/", From 2982d4c919f3c4b954566a0231d9f93f112af07e Mon Sep 17 00:00:00 2001 From: Leif Denby Date: Fri, 26 Sep 2025 09:22:32 +0200 Subject: [PATCH 08/19] more cleanup #2 --- configurations/surface-dummy-model_DINI/README.md | 2 +- configurations/surface-dummy-model_DINI/entry.sh | 4 ++-- configurations/surface-dummy-model_DINI/pyproject.toml | 4 ++++ 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/configurations/surface-dummy-model_DINI/README.md b/configurations/surface-dummy-model_DINI/README.md index 262a625..16a0b0d 100644 --- a/configurations/surface-dummy-model_DINI/README.md +++ b/configurations/surface-dummy-model_DINI/README.md @@ -1,4 +1,4 @@ -# surface-dummy-model_DANRA +# surface-dummy-model_DINI The model configuration in this directory is a dummy model that was trained on surface variables from DANRA, only 10 days of data and only trained 10 diff --git a/configurations/surface-dummy-model_DINI/entry.sh b/configurations/surface-dummy-model_DINI/entry.sh index e7732d4..b1d9528 100755 --- a/configurations/surface-dummy-model_DINI/entry.sh +++ b/configurations/surface-dummy-model_DINI/entry.sh @@ -44,8 +44,8 @@ uv run python src/create_inference_dataset.py ## 2. Create graph # TODO: could cache this, although that isn't implemented at the moment -# uv run python -m neural_lam.create_graph --config_path ${INFERENCE_WORK_PATH}/config.yaml \ -# --name ${GRAPH_NAME} ${CREATE_GRAPH_ARG} +uv run python -m neural_lam.create_graph --config_path ${INFERENCE_WORK_PATH}/config.yaml \ + --name ${GRAPH_NAME} ${CREATE_GRAPH_ARG} ## 3. Run inference uv run python -m neural_lam.train_model --config_path ${INFERENCE_WORK_PATH}/config.yaml \ diff --git a/configurations/surface-dummy-model_DINI/pyproject.toml b/configurations/surface-dummy-model_DINI/pyproject.toml index 83c9ec6..51a7c97 100644 --- a/configurations/surface-dummy-model_DINI/pyproject.toml +++ b/configurations/surface-dummy-model_DINI/pyproject.toml @@ -4,6 +4,10 @@ version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.11" +authors = [ + {name = "Leif Denby", email = "lcd@dmi.dk"}, + {name = "Kasper Hintz", email = "kah@dmi.dk"}, +] dependencies = [ "parse>=1.20.2", "dask>=2025.4.1", From ff7966c9023b6b825246d7a81697021177ab7a0f Mon Sep 17 00:00:00 2001 From: Leif Denby Date: Fri, 26 Sep 2025 09:55:32 +0200 Subject: [PATCH 09/19] remove src from pyproject.toml --- configurations/surface-dummy-model_DINI/pyproject.toml | 7 ------- 1 file changed, 7 deletions(-) diff --git a/configurations/surface-dummy-model_DINI/pyproject.toml b/configurations/surface-dummy-model_DINI/pyproject.toml index 51a7c97..1a0777c 100644 --- a/configurations/surface-dummy-model_DINI/pyproject.toml +++ b/configurations/surface-dummy-model_DINI/pyproject.toml @@ -34,13 +34,6 @@ profile = "black" requires = ["setuptools>=61", "setuptools_scm"] build-backend = "setuptools.build_meta" -[tool.setuptools] -package-dir = {"" = "src"} -include-package-data = true - -[tool.setuptools.packages.find] -where = ["src"] - [tool.uv.sources] mllam-data-prep = { git = "https://github.com/leifdenby/mllam-data-prep", rev = "feat/inference-cli-args" } neural-lam = { git = "https://github.com/leifdenby/neural-lam", rev = "dev/first-inference-image" } From 9dcfa6a1b5d0e9ccdbba9cde4e2b94449e44cc11 Mon Sep 17 00:00:00 2001 From: Leif Denby Date: Fri, 26 Sep 2025 10:00:47 +0200 Subject: [PATCH 10/19] include src/ in container image --- configurations/surface-dummy-model_DINI/Containerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/configurations/surface-dummy-model_DINI/Containerfile b/configurations/surface-dummy-model_DINI/Containerfile index a6744fe..889bf36 100644 --- a/configurations/surface-dummy-model_DINI/Containerfile +++ b/configurations/surface-dummy-model_DINI/Containerfile @@ -5,6 +5,7 @@ WORKDIR /workspace COPY pyproject.toml . COPY *.yaml ./ COPY entry.sh ./ +COPY src/ ./src # Download inference artifact from S3 ARG DEFAULT_ARTIFACT="s3://mlwm-artifacts/inference-artifacts/surface-dummy-model_DINI.zip" From 16f5c99bcffa9bb6d88eac8c77c6b710a523d379 Mon Sep 17 00:00:00 2001 From: Leif Denby Date: Fri, 26 Sep 2025 14:22:15 +0200 Subject: [PATCH 11/19] disable wandb --- configurations/surface-dummy-model_DINI/entry.sh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/configurations/surface-dummy-model_DINI/entry.sh b/configurations/surface-dummy-model_DINI/entry.sh index b1d9528..6e99bc4 100755 --- a/configurations/surface-dummy-model_DINI/entry.sh +++ b/configurations/surface-dummy-model_DINI/entry.sh @@ -32,6 +32,10 @@ OUTPUT_DATASETS_ROOT_PATH="${INFERENCE_WORK_PATH}/outputs" mkdir -p ${OUTPUT_DATASETS_ROOT_PATH} +# disable weights and biases logging, without this --eval with neural-lam fails +# because it tries to set up the logging and there is no WANDB_API_KEY set +uv run wandb disabled + ## 1. Create inference dataset # This uses a cli stored within mlwm to called mllam-data-prep to create the # inference dataset. The inference dataset is created by modifying the From 5f56aefc798ccecf3808ebba3e63d3811f96761f Mon Sep 17 00:00:00 2001 From: Leif Denby Date: Fri, 26 Sep 2025 16:28:42 +0200 Subject: [PATCH 12/19] move runtime args to env vars and support multiple datastores --- .../surface-dummy-model_DINI/.gitignore | 1 + .../surface-dummy-model_DINI/entry.sh | 52 ++- .../src/create_inference_dataset.py | 298 ++++++++++++++---- 3 files changed, 286 insertions(+), 65 deletions(-) diff --git a/configurations/surface-dummy-model_DINI/.gitignore b/configurations/surface-dummy-model_DINI/.gitignore index dac143b..6ec839b 100644 --- a/configurations/surface-dummy-model_DINI/.gitignore +++ b/configurations/surface-dummy-model_DINI/.gitignore @@ -3,3 +3,4 @@ inference_artifact/ *.yaml inference_workdir/ +.env diff --git a/configurations/surface-dummy-model_DINI/entry.sh b/configurations/surface-dummy-model_DINI/entry.sh index 6e99bc4..16b4815 100755 --- a/configurations/surface-dummy-model_DINI/entry.sh +++ b/configurations/surface-dummy-model_DINI/entry.sh @@ -3,16 +3,56 @@ # # This script is intended to be run in a container, and assumes that during the # container image build that the inference artifact was unpacked to -# inference_artifact/ +# inference_artifact/. You can also run this script interactively if you have +# extracted the inference artifact yourself. +# +# The selection of datasets to use for input to the model, analysis time and +# forecast duration is controller by the following environment variables: +# DATASTORE_INPUT_PATHS, ANALYSIS_TIME, FORECAST_DURATION and NUM_EVAL_STEPS +# (the latter should be inferred from FORECAST_DURATION, but that is TODO) +# +# - DATASTORE_INPUT_PATHS is a comma-separated list of mappings of +# {datastore_name}:{input_name}={input_path} +# - ANALYSIS_TIME is the analysis time to start the forecast from is ISO8601 +# format +# - FORECAST_DURATION is the duration of the forecast in ISO8601 duration +# format and effects the length of the produced inference dataset +# - NUM_EVAL_STEPS is the number of autoregressive steps to run during +# inference. This should be consistent with FORECAST_DURATION and the model +# configuration (e.g. if the model was trained on 3-hourly data and +# FORECAST_DURATION is PT18H then NUM_EVAL_STEPS should be 6 # make this script fail on any error set -e +## Runtime configuration (variable expected to change on every execution) +# enable use of .env so that during development we can set environment (e.g. +# paths to replace in datastore config) +if [ -f .env ] ; then + echo "Sourcing local .env file" + set -a && source .env && set +a +fi + +# set default override of input paths in the datastore config used for creating the +# inference dataset if environment variable isn't set +DATASTORE_INPUT_PATHS=${DATASTORE_INPUT_PATHS:-"\ +danra:danra_surface=https://object-store.os-api.cci1.ecmwf.int/danra/v0.6.0dev1/single_levels.zarr/,\ +danra:danra_static=https://object-store.os-api.cci1.ecmwf.int/danra/v0.5.0/single_levels.zarr/"} +TIME_DIMENSIONS=${TIME_DIMENSIONS:-"analysis_time,elapsed_forecast_duration"} +ANALYSIS_TIME=${ANALYSIS_TIME:-"2019-02-04T12:00"} # assumed to be in UTC # forecast out to 18 hours, which means 6 steps of 3 hours each (the model was # trained on 3-hourly analysis data) -NUM_EVAL_STEPS=6 +FORECAST_DURATION=${FORECAST_DURATION:-"PT18H"} +NUM_EVAL_STEPS=${NUM_EVAL_STEPS:-6} -# model specific parameters, ideally these would come from some config +echo "Creating forecast using following runtime args:" +echo " DATASTORE_INPUT_PATHS=${DATASTORE_INPUT_PATHS}" +echo " TIME_DIMENSIONS=${TIME_DIMENSIONS}" +echo " ANALYSIS_TIME=${ANALYSIS_TIME}" +echo " FORECAST_DURATION=${FORECAST_DURATION}" +echo " NUM_EVAL_STEPS=${NUM_EVAL_STEPS}" + +## Model specific inference configuration (same across all executions) NUM_HIDDEN_DIMS=2 GRAPH_NAME="multiscale" HIEARCHICAL_GRAPH=false @@ -23,13 +63,12 @@ else CREATE_GRAPH_ARG="" fi +## Setup working directories INFERENCE_ARTIFACT_PATH="./inference_artifact" INFERENCE_WORK_PATH="./inference_workdir" - # XXX: these mount points could come from config.yaml for the model run configuration INPUT_DATASETS_ROOT_PATH="${INFERENCE_WORK_PATH}/inputs" OUTPUT_DATASETS_ROOT_PATH="${INFERENCE_WORK_PATH}/outputs" - mkdir -p ${OUTPUT_DATASETS_ROOT_PATH} # disable weights and biases logging, without this --eval with neural-lam fails @@ -44,6 +83,9 @@ uv run wandb disabled # b) include the statistics from the training dataset and # c) set the dimensions in the configuration to have `analysis_time` and # `elapsed_forecast_duration` instead of just `time`. +DATASTORE_INPUT_PATHS=${DATASTORE_INPUT_PATHS} \ +ANALYSIS_TIME=${ANALYSIS_TIME} \ +FORECAST_DURATION=${FORECAST_DURATION} \ uv run python src/create_inference_dataset.py ## 2. Create graph diff --git a/configurations/surface-dummy-model_DINI/src/create_inference_dataset.py b/configurations/surface-dummy-model_DINI/src/create_inference_dataset.py index 9497d97..392f3c7 100644 --- a/configurations/surface-dummy-model_DINI/src/create_inference_dataset.py +++ b/configurations/surface-dummy-model_DINI/src/create_inference_dataset.py @@ -1,43 +1,94 @@ import copy import datetime +import os from pathlib import Path from typing import Dict +import isodate import mllam_data_prep as mdp import mllam_data_prep.config as mdp_config import xarray as xr from loguru import logger -from neural_lam.config import NeuralLAMConfig +from neural_lam.config import DatastoreSelection, NeuralLAMConfig FP_TRAINING_CONFIG = "inference_artifact/configs/config.yaml" -FP_TRAINING_DATASTORE_STATS = ( - "inference_artifact/stats/danra.datastore.stats.zarr" -) -FP_TRAINING_DATASTORE_CONFIG = ( - "inference_artifact/configs/danra.datastore.yaml" -) - -S3_BUCKET_URL = "https://object-store.os-api.cci1.ecmwf.int/danra" -OVERWRITE_INPUT_PATHS = dict( - danra_surface=f"{S3_BUCKET_URL}/v0.6.0dev1/single_levels.zarr/", - danra_static=f"{S3_BUCKET_URL}/v0.5.0/single_levels.zarr/", -) -ANALYSIS_TIME = "2019-02-04T12:00" -FORECAST_DURATION = datetime.timedelta(hours=6) - # the path below describes where to save the inference datastore config, # inference zarr dataset and the inference config for neural-lam itself FP_INFERENCE_WORKDIR = "inference_workdir" -FP_INFERENCE_DATASTORE_CONFIG = f"{FP_INFERENCE_WORKDIR}/danra.datastore.yaml" FP_INFERENCE_CONFIG = f"{FP_INFERENCE_WORKDIR}/config.yaml" +def _parse_datastore_input_paths(s: str) -> Dict[str, Dict[str, str]]: + """ + Parse a comma-separated list of {datastore_name}:{input_name}={input_path} + into a dictionary of dictionaries. + + Parameters + ---------- + s : str + The string to parse. + + Returns + ------- + Dict[str, Dict[str, str]] + A dictionary of dictionaries. + """ + result = {} + for item in s.split(","): + try: + datastore_input, input_path = item.split("=") + datastore_name, input_name = datastore_input.split(":") + except ValueError: + raise ValueError( + f"Invalid format for DATASTORE_INPUT_PATHS item: {item}. " + f"Expected format is {{datastore_name}}:{{input_name}}={{input_path}}" + ) + if datastore_name not in result: + result[datastore_name] = {} + result[datastore_name][input_name] = input_path + return result + + +REQUIRED_ENV_VARS = { + # comma-separated list of {datastore_name}:{input_name}={input_path} + "DATASTORE_INPUT_PATHS": _parse_datastore_input_paths, + # iso8601 datetime string, e.g. 2019-02-04T12:00+0000 + "ANALYSIS_TIME": isodate.parse_datetime, + # iso8160 duration string, e.g. PT6H for 6 hours + "FORECAST_DURATION": isodate.parse_duration, + # comma-separated list of time dimensions to replace, e.g. + # time,forecast_reference_time + "TIME_DIMENSIONS": lambda s: s.split(","), +} + + +def _parse_env_vars() -> Dict[str, any]: + """ + Parse and validate required environment variables. + + Returns + ------- + Dict[str, any] + A dictionary of parsed environment variables. + """ + env_vars = {} + for var, parser in REQUIRED_ENV_VARS.items(): + value = os.getenv(var) + if value is None: + raise EnvironmentError(f"Environment variable {var} is not set.") + try: + env_vars[var] = parser(value) + except Exception as e: + raise ValueError(f"Error parsing environment variable {var}: {e}") + return env_vars + + def _create_inference_datastore_config( training_config: mdp.Config, forecast_analysis_time: datetime.datetime, forecast_duration: datetime.timedelta, + time_dimensions: list[str], overwrite_input_paths: Dict[str, str] = {}, - sampling_dim: str = "time", ) -> mdp.Config: """ From a training datastore config, create an inference datastore config that: @@ -46,7 +97,7 @@ def _create_inference_datastore_config( - has a single split called "test" with a single time slice given by the `forecast_analysis_time` argument - optionally overwrites input paths with the `overwrite_input_paths` argument - - ensures that the output variables have the correct dimensions, i.e. + - ensures that the output variables have the correct dimensions, for example replacing `time` with [`analysis_time`, `elapsed_forecast_duration`] - ensures that the input datasets have the correct dimensions and dim_mappings, i.e. replacing `time` with [`analysis_time`, `elapsed_forecast_duration` @@ -59,11 +110,14 @@ def _create_inference_datastore_config( The analysis time to use for the inference config forecast_duration : datetime.timedelta The forecast duration to use for the inference config + time_dimensions : list[str], optional + The list of time dimensions to replace `time` with, for example + replacing `time` with [`analysis_time`, `elapsed_forecast_duration`], + the first dimension is assumed to be the sampling dimension (e.g. the + analysis time) overwrite_input_paths : Dict[str, str], optional A dictionary of input names and paths to overwrite in the training config, by default {} - sampling_dim : str, optional - The new sampling dimension to use, by default "time" Returns ------- @@ -72,12 +126,17 @@ def _create_inference_datastore_config( """ # the new sampling dimension is `analysis_time` old_sampling_dim = "time" - sampling_dim = "analysis_time" + if not isinstance(time_dimensions, list) or len(time_dimensions) == 0: + raise ValueError( + "time_dimensions must be a non-empty list of strings, got " + f"{time_dimensions}" + ) + sampling_dim = time_dimensions[0] # instead of only having `time` as dimension, the input forecast datasets # have two dimensions that describe the time value [analysis_time, # elapsed_forecast_duration] dim_replacements = dict( - time=["analysis_time", "elapsed_forecast_duration"], + time=time_dimensions, ) # there will be a single split called "test" # split_name = "test" @@ -175,9 +234,31 @@ def _create_inference_datastore_config( return inference_config -def _prepare_inference_dataset_zarr() -> str: +def _prepare_inference_dataset_zarr( + datastore_name: str, + datastore_input_paths: Dict[str, str], + analysis_time: datetime.datetime, + forecast_duration: datetime.timedelta, + time_dimensions: list[str], +) -> str: """ - Prepare the inference dataset. + Prepare the inference dataset for a single datastore. + + Parameters + ---------- + datastore_name : str + The name of the datastore to prepare the inference dataset for, this + sets the expected path of the training datastore config and stats. + datastore_input_paths : Dict[str, str] + A dictionary of input names and paths to overwrite in the training + config. + analysis_time : datetime.datetime + The analysis time to use for the inference dataset. + forecast_duration : datetime.timedelta + The forecast duration to use for the inference dataset. + time_dimensions : list[str] + The list of time dimensions to replace `time` with, for example + replacing `time` with [`analysis_time`, `elapsed_forecast_duration`] Returns ------- @@ -186,61 +267,138 @@ def _prepare_inference_dataset_zarr() -> str: is saved as a zarr store in the same directory as the config file, with the same name but with a .zarr extension instead of .yaml. """ - if Path(FP_INFERENCE_DATASTORE_CONFIG).exists(): - logger.info( - f"Found existing inference datastore config at " - f"{FP_INFERENCE_DATASTORE_CONFIG}, skipping dataset creation" - ) - return FP_INFERENCE_DATASTORE_CONFIG - - ds_stats = xr.open_dataset(FP_TRAINING_DATASTORE_STATS) + fp_training_datastore_stats = ( + f"inference_artifact/stats/{datastore_name}.datastore.stats.zarr" + ) + ds_stats = xr.open_dataset(fp_training_datastore_stats) logger.debug(f"Opened stats dataset: {ds_stats}") + fp_training_datastore_config = ( + f"inference_artifact/configs/{datastore_name}.datastore.yaml" + ) + logger.debug( - f"Loading training datastore config from {FP_TRAINING_DATASTORE_CONFIG}" + f"Loading training datastore config from {fp_training_datastore_config}" ) datastore_training_config = mdp.Config.from_yaml_file( - FP_TRAINING_DATASTORE_CONFIG + fp_training_datastore_config ) inference_config = _create_inference_datastore_config( training_config=datastore_training_config, - forecast_analysis_time=datetime.datetime.fromisoformat(ANALYSIS_TIME), - forecast_duration=FORECAST_DURATION, - overwrite_input_paths=OVERWRITE_INPUT_PATHS, - sampling_dim="analysis_time", + forecast_analysis_time=analysis_time, + forecast_duration=forecast_duration, + overwrite_input_paths=datastore_input_paths, + time_dimensions=time_dimensions, ) - ds = mdp.create_dataset(config=inference_config, ds_stats=ds_stats) + fp_inference_datastore_config = ( + f"{FP_INFERENCE_WORKDIR}/{datastore_name}.datastore.yaml" + ) + + Path(fp_inference_datastore_config).parent.mkdir( + parents=True, exist_ok=True + ) + logger.info( + f"Saving inference datastore config to {fp_inference_datastore_config}" + ) # neural-lam's convention is to have the same name for the zarr store # as the config file, but with .zarr extension - fp_dataset = FP_INFERENCE_DATASTORE_CONFIG.replace(".yaml", ".zarr") + fp_dataset = fp_inference_datastore_config.replace(".yaml", ".zarr") + inference_config.to_yaml_file(fp_inference_datastore_config) - Path(FP_INFERENCE_DATASTORE_CONFIG).parent.mkdir( - parents=True, exist_ok=True - ) - inference_config.to_yaml_file(FP_INFERENCE_DATASTORE_CONFIG) + ds = mdp.create_dataset(config=inference_config, ds_stats=ds_stats) ds.to_zarr(fp_dataset) logger.info(f"Saved inference dataset to {fp_dataset}") - return FP_INFERENCE_DATASTORE_CONFIG + return fp_inference_datastore_config + + +def _prepare_all_inference_dataset_zarr( + analysis_time: datetime.datetime, + forecast_duration: datetime.timedelta, + datastore_input_paths: Dict[str, Dict[str, str]], + time_dimensions: list[str], +) -> str: + """ + Prepare the inference dataset. + Parameters + ---------- + analysis_time : datetime.datetime + The analysis time to use for the inference dataset(s). + forecast_duration : datetime.timedelta + The forecast duration to use for the inference dataset(s). + datastore_input_paths : Dict[str, Dict[str,str]] + A dictionary of datastore names and their corresponding input names + and paths to overwrite in the training config. + time_dimensions : list[str] + The list of time dimensions to replace `time` with, for example + replacing `time` with [`analysis_time`, `elapsed_forecast_duration`] -def _create_inference_config(fp_inference_datastore_config: str) -> str: + Returns + ------- + Dict[str, str] + A dictionary of datastore names and the path to their corresponding + inference datastore config file. The inference dataset is saved as a + zarr store in the same directory as the config file, with the same + name but with a .zarr extension instead of .yaml. + """ + fps_datastore_configs = {} + for datastore_name, input_paths in datastore_input_paths.items(): + logger.info(f"Processing {datastore_name} datastore for inference") + fp_training_datastore_config = _prepare_inference_dataset_zarr( + datastore_name=datastore_name, + datastore_input_paths=input_paths, + analysis_time=analysis_time, + forecast_duration=forecast_duration, + time_dimensions=time_dimensions, + ) + + fps_datastore_configs[datastore_name] = fp_training_datastore_config + + return fps_datastore_configs + + +def _create_inference_config( + fps_inference_datastore_config: Dict[str, str] +) -> str: training_config = NeuralLAMConfig.from_yaml_file(FP_TRAINING_CONFIG) inference_config = copy.deepcopy(training_config) - # overwrite the path to the datastore config, to point to the - # inference datastore config - inference_config.datastore.config_path = Path( - fp_inference_datastore_config - ).relative_to(Path(FP_INFERENCE_CONFIG).parent) - - # XXX: There is a bug in neural-lam here that means that the datastore kind - # doesn't correctly get serialised to a string in the config file when - # saved to yaml - inference_config.datastore.kind = "mdp" + def _set_datastore_config_path(node: DatastoreSelection, fp: str): + node.config_path = Path(fp).relative_to( + Path(FP_INFERENCE_CONFIG).parent + ) + # XXX: There is a bug in neural-lam here that means that the datastore kind + # doesn't correctly get serialised to a string in the config file when + # saved to yaml + node.kind = str(node.kind) + + # see if the neural-lam config was for single or multiple datastores + if hasattr(training_config, "datastores"): + # using multiple datastores + for ( + datastore_name, + fp_datastore_config, + ) in fps_inference_datastore_config.items(): + if datastore_name not in inference_config.datastores: + raise ValueError( + f"Datastore {datastore_name} not found in training config. " + f"Available datastores are: " + f"{list(inference_config.datastores.keys())}" + ) + _set_datastore_config_path( + node=inference_config.datastores[datastore_name], + fp=fp_datastore_config, + ) + else: + fp_datastore_config = list(fps_inference_datastore_config.values())[0] + # using a single datastore + _set_datastore_config_path( + node=inference_config.datastore, fp=fp_datastore_config + ) inference_config.to_yaml_file(FP_INFERENCE_CONFIG) logger.info(f"Saved inference config to {FP_INFERENCE_CONFIG}") @@ -250,11 +408,31 @@ def _create_inference_config(fp_inference_datastore_config: str) -> str: @logger.catch(reraise=True) def main(): - fp_inference_datastore_config = _prepare_inference_dataset_zarr() + env_vars = _parse_env_vars() + # convert analysis time to UTC and strip timezone info + analysis_time = ( + env_vars["ANALYSIS_TIME"] + .astimezone(datetime.timezone.utc) + .replace(tzinfo=None) + ) + + fps_inference_datastore_config = _prepare_all_inference_dataset_zarr( + analysis_time=analysis_time, + forecast_duration=env_vars["FORECAST_DURATION"], + datastore_input_paths=env_vars["DATASTORE_INPUT_PATHS"], + time_dimensions=env_vars["TIME_DIMENSIONS"], + ) _create_inference_config( - fp_inference_datastore_config=fp_inference_datastore_config + fps_inference_datastore_config=fps_inference_datastore_config ) if __name__ == "__main__": - main() + with_debugger = os.getenv("MLWM_DEBUGGER", "0") + if with_debugger == "ipdb": + import ipdb + + with ipdb.launch_ipdb_on_exception(): + main() + else: + main() From db44c91a452460dd7309697b81c7b83452cdb065 Mon Sep 17 00:00:00 2001 From: Leif Denby Date: Fri, 26 Sep 2025 16:28:57 +0200 Subject: [PATCH 13/19] add developing notes --- .../surface-dummy-model_DINI/DEVELOPING.md | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 configurations/surface-dummy-model_DINI/DEVELOPING.md diff --git a/configurations/surface-dummy-model_DINI/DEVELOPING.md b/configurations/surface-dummy-model_DINI/DEVELOPING.md new file mode 100644 index 0000000..a1eee63 --- /dev/null +++ b/configurations/surface-dummy-model_DINI/DEVELOPING.md @@ -0,0 +1,51 @@ +# Development notes + +## Local development + +- currently image build only works on amd64 machines (i.e. not on macos) + +- image build requires `aws` cli which can be retrieved from https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip + +- to load AWS crendentials from `.aws/credentials` you can use the following script (drop in e.g. `~/.bashrc`): + +```bash +aws-load-creds() { + local profile=$1 + if [[ -z "$profile" ]]; then + echo "❌ Usage: aws-load-creds " + return 1 + fi + + local access_key + local secret_key + + access_key=$(aws configure get aws_access_key_id --profile "$profile" 2>/dev/null) + secret_key=$(aws configure get aws_secret_access_key --profile "$profile" 2>/dev/null) + + if [[ -z "$access_key" || -z "$secret_key" ]]; then + echo "❌ The config profile '$profile' could not be found or is incomplete." + return 1 + fi + + export AWS_ACCESS_KEY_ID="$access_key" + export AWS_SECRET_ACCESS_KEY="$secret_key" + + echo "✅ Loaded AWS credentials from profile: $profile" +} + +aws-list-profiles() { + echo "📂 AWS profiles found:" + grep '^\[profile ' ~/.aws/config 2>/dev/null | sed 's/^\[profile //' | sed 's/\]//' + grep '^\[' ~/.aws/credentials 2>/dev/null | sed 's/^\[//' | sed 's/\]//' +} +``` + +- to set the environment variables for `./entry.sh` you can use a `.env` file. E.g. to run with DINI forecast data you would use: + +```bash +# .env +ANALYSIS_TIME="2025-09-22T120000Z" +DINI_ZARR="s3://harmonie-zarr/dini/control/${ANALYSIS_TIME}/single_levels.zarr/" +DATASTORE_INPUT_PATHS="danra:danra_surface=${DINI_ZARR},danra:danra_static=${DINI_ZARR}" +TIME_DIMENSIONS="time" +``` From ebbf192fe4d62e339d5cb81fa14cd7d1e6ffdcaa Mon Sep 17 00:00:00 2001 From: Leif Denby Date: Mon, 29 Sep 2025 10:48:41 +0200 Subject: [PATCH 14/19] ":" -> "." in datastore input path overrides --- configurations/surface-dummy-model_DINI/DEVELOPING.md | 2 +- configurations/surface-dummy-model_DINI/entry.sh | 6 +++--- .../src/create_inference_dataset.py | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/configurations/surface-dummy-model_DINI/DEVELOPING.md b/configurations/surface-dummy-model_DINI/DEVELOPING.md index a1eee63..d99937a 100644 --- a/configurations/surface-dummy-model_DINI/DEVELOPING.md +++ b/configurations/surface-dummy-model_DINI/DEVELOPING.md @@ -46,6 +46,6 @@ aws-list-profiles() { # .env ANALYSIS_TIME="2025-09-22T120000Z" DINI_ZARR="s3://harmonie-zarr/dini/control/${ANALYSIS_TIME}/single_levels.zarr/" -DATASTORE_INPUT_PATHS="danra:danra_surface=${DINI_ZARR},danra:danra_static=${DINI_ZARR}" +DATASTORE_INPUT_PATHS="danra.danra_surface=${DINI_ZARR},danra.danra_static=${DINI_ZARR}" TIME_DIMENSIONS="time" ``` diff --git a/configurations/surface-dummy-model_DINI/entry.sh b/configurations/surface-dummy-model_DINI/entry.sh index 16b4815..bc9a943 100755 --- a/configurations/surface-dummy-model_DINI/entry.sh +++ b/configurations/surface-dummy-model_DINI/entry.sh @@ -12,7 +12,7 @@ # (the latter should be inferred from FORECAST_DURATION, but that is TODO) # # - DATASTORE_INPUT_PATHS is a comma-separated list of mappings of -# {datastore_name}:{input_name}={input_path} +# {datastore_name}.{input_name}={input_path} # - ANALYSIS_TIME is the analysis time to start the forecast from is ISO8601 # format # - FORECAST_DURATION is the duration of the forecast in ISO8601 duration @@ -36,8 +36,8 @@ fi # set default override of input paths in the datastore config used for creating the # inference dataset if environment variable isn't set DATASTORE_INPUT_PATHS=${DATASTORE_INPUT_PATHS:-"\ -danra:danra_surface=https://object-store.os-api.cci1.ecmwf.int/danra/v0.6.0dev1/single_levels.zarr/,\ -danra:danra_static=https://object-store.os-api.cci1.ecmwf.int/danra/v0.5.0/single_levels.zarr/"} +danra.danra_surface=https://object-store.os-api.cci1.ecmwf.int/danra/v0.6.0dev1/single_levels.zarr/,\ +danra.danra_static=https://object-store.os-api.cci1.ecmwf.int/danra/v0.5.0/single_levels.zarr/"} TIME_DIMENSIONS=${TIME_DIMENSIONS:-"analysis_time,elapsed_forecast_duration"} ANALYSIS_TIME=${ANALYSIS_TIME:-"2019-02-04T12:00"} # assumed to be in UTC # forecast out to 18 hours, which means 6 steps of 3 hours each (the model was diff --git a/configurations/surface-dummy-model_DINI/src/create_inference_dataset.py b/configurations/surface-dummy-model_DINI/src/create_inference_dataset.py index 392f3c7..69db777 100644 --- a/configurations/surface-dummy-model_DINI/src/create_inference_dataset.py +++ b/configurations/surface-dummy-model_DINI/src/create_inference_dataset.py @@ -20,7 +20,7 @@ def _parse_datastore_input_paths(s: str) -> Dict[str, Dict[str, str]]: """ - Parse a comma-separated list of {datastore_name}:{input_name}={input_path} + Parse a comma-separated list of {datastore_name}.{input_name}={input_path} into a dictionary of dictionaries. Parameters @@ -37,7 +37,7 @@ def _parse_datastore_input_paths(s: str) -> Dict[str, Dict[str, str]]: for item in s.split(","): try: datastore_input, input_path = item.split("=") - datastore_name, input_name = datastore_input.split(":") + datastore_name, input_name = datastore_input.split(".") except ValueError: raise ValueError( f"Invalid format for DATASTORE_INPUT_PATHS item: {item}. " From c0e06a14c382243387235298d9308516ca977a30 Mon Sep 17 00:00:00 2001 From: Leif Denby Date: Tue, 30 Sep 2025 14:26:03 +0200 Subject: [PATCH 15/19] update for upstream fixes --- .../surface-dummy-model_DINI/pyproject.toml | 6 +++--- .../src/create_inference_dataset.py | 19 ++++++++++++++----- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/configurations/surface-dummy-model_DINI/pyproject.toml b/configurations/surface-dummy-model_DINI/pyproject.toml index 1a0777c..7ee2dfb 100644 --- a/configurations/surface-dummy-model_DINI/pyproject.toml +++ b/configurations/surface-dummy-model_DINI/pyproject.toml @@ -30,10 +30,10 @@ dev = [ [tool.isort] profile = "black" -[build-system] -requires = ["setuptools>=61", "setuptools_scm"] -build-backend = "setuptools.build_meta" [tool.uv.sources] mllam-data-prep = { git = "https://github.com/leifdenby/mllam-data-prep", rev = "feat/inference-cli-args" } neural-lam = { git = "https://github.com/leifdenby/neural-lam", rev = "dev/first-inference-image" } +[build-system] +requires = ["setuptools>=61", "setuptools_scm"] +build-backend = "setuptools.build_meta" diff --git a/configurations/surface-dummy-model_DINI/src/create_inference_dataset.py b/configurations/surface-dummy-model_DINI/src/create_inference_dataset.py index 69db777..d208d8d 100644 --- a/configurations/surface-dummy-model_DINI/src/create_inference_dataset.py +++ b/configurations/surface-dummy-model_DINI/src/create_inference_dataset.py @@ -7,6 +7,7 @@ import isodate import mllam_data_prep as mdp import mllam_data_prep.config as mdp_config +import parse import xarray as xr from loguru import logger from neural_lam.config import DatastoreSelection, NeuralLAMConfig @@ -16,6 +17,7 @@ # inference zarr dataset and the inference config for neural-lam itself FP_INFERENCE_WORKDIR = "inference_workdir" FP_INFERENCE_CONFIG = f"{FP_INFERENCE_WORKDIR}/config.yaml" +DATASTORE_INPUT_PATH_FORMAT = "{datastore_name}.{input_name}={input_path}" def _parse_datastore_input_paths(s: str) -> Dict[str, Dict[str, str]]: @@ -35,16 +37,23 @@ def _parse_datastore_input_paths(s: str) -> Dict[str, Dict[str, str]]: """ result = {} for item in s.split(","): - try: - datastore_input, input_path = item.split("=") - datastore_name, input_name = datastore_input.split(".") - except ValueError: + parts = parse.parse(DATASTORE_INPUT_PATH_FORMAT, item) + if parts is None: raise ValueError( f"Invalid format for DATASTORE_INPUT_PATHS item: {item}. " - f"Expected format is {{datastore_name}}:{{input_name}}={{input_path}}" + f"Expected format is {DATASTORE_INPUT_PATH_FORMAT}" ) + datastore_name = parts["datastore_name"] + input_name = parts["input_name"] + input_path = parts["input_path"] + if datastore_name not in result: result[datastore_name] = {} + elif input_name in result[datastore_name]: + raise ValueError( + f"Duplicate input name {input_name} for datastore " + f"{datastore_name} in DATASTORE_INPUT_PATHS" + ) result[datastore_name][input_name] = input_path return result From 62bd766f06259bad68e806484da57ba8b0f83d16 Mon Sep 17 00:00:00 2001 From: Leif Denby Date: Tue, 30 Sep 2025 15:40:13 +0200 Subject: [PATCH 16/19] expose workdir through env var --- .../surface-dummy-model_DINI/entry.sh | 16 +++++---- .../src/create_inference_dataset.py | 36 ++++++++++++------- 2 files changed, 33 insertions(+), 19 deletions(-) diff --git a/configurations/surface-dummy-model_DINI/entry.sh b/configurations/surface-dummy-model_DINI/entry.sh index bc9a943..a0cd0f7 100755 --- a/configurations/surface-dummy-model_DINI/entry.sh +++ b/configurations/surface-dummy-model_DINI/entry.sh @@ -44,6 +44,7 @@ ANALYSIS_TIME=${ANALYSIS_TIME:-"2019-02-04T12:00"} # assumed to be in UTC # trained on 3-hourly analysis data) FORECAST_DURATION=${FORECAST_DURATION:-"PT18H"} NUM_EVAL_STEPS=${NUM_EVAL_STEPS:-6} +INFERENCE_WORKDIR=${INFERENCE_WORKDIR:-"./inference_workdir"} echo "Creating forecast using following runtime args:" echo " DATASTORE_INPUT_PATHS=${DATASTORE_INPUT_PATHS}" @@ -51,6 +52,7 @@ echo " TIME_DIMENSIONS=${TIME_DIMENSIONS}" echo " ANALYSIS_TIME=${ANALYSIS_TIME}" echo " FORECAST_DURATION=${FORECAST_DURATION}" echo " NUM_EVAL_STEPS=${NUM_EVAL_STEPS}" +echo " INFERENCE_WORKDIR=${INFERENCE_WORKDIR}" ## Model specific inference configuration (same across all executions) NUM_HIDDEN_DIMS=2 @@ -65,10 +67,8 @@ fi ## Setup working directories INFERENCE_ARTIFACT_PATH="./inference_artifact" -INFERENCE_WORK_PATH="./inference_workdir" -# XXX: these mount points could come from config.yaml for the model run configuration -INPUT_DATASETS_ROOT_PATH="${INFERENCE_WORK_PATH}/inputs" -OUTPUT_DATASETS_ROOT_PATH="${INFERENCE_WORK_PATH}/outputs" +INPUT_DATASETS_ROOT_PATH="${INFERENCE_WORKDIR}/inputs" +OUTPUT_DATASETS_ROOT_PATH="${INFERENCE_WORKDIR}/outputs" mkdir -p ${OUTPUT_DATASETS_ROOT_PATH} # disable weights and biases logging, without this --eval with neural-lam fails @@ -86,15 +86,17 @@ uv run wandb disabled DATASTORE_INPUT_PATHS=${DATASTORE_INPUT_PATHS} \ ANALYSIS_TIME=${ANALYSIS_TIME} \ FORECAST_DURATION=${FORECAST_DURATION} \ +TIME_DIMENSIONS=${TIME_DIMENSIONS} \ +INFERENCE_WORKDIR=${INFERENCE_WORKDIR} \ uv run python src/create_inference_dataset.py ## 2. Create graph # TODO: could cache this, although that isn't implemented at the moment -uv run python -m neural_lam.create_graph --config_path ${INFERENCE_WORK_PATH}/config.yaml \ +uv run python -m neural_lam.create_graph --config_path ${INFERENCE_WORKDIR}/config.yaml \ --name ${GRAPH_NAME} ${CREATE_GRAPH_ARG} ## 3. Run inference -uv run python -m neural_lam.train_model --config_path ${INFERENCE_WORK_PATH}/config.yaml \ +uv run python -m neural_lam.train_model --config_path ${INFERENCE_WORKDIR}/config.yaml \ --eval test\ --graph ${GRAPH_NAME} \ --hidden_dim ${NUM_HIDDEN_DIMS} \ @@ -110,7 +112,7 @@ uv run python -m neural_lam.train_model --config_path ${INFERENCE_WORK_PATH}/con # that manually here but maybe mllam-data-prep should be able to merge inputs # originating from the same zarr dataset path? uv run python -m mllam_data_prep.recreate_inputs \ - --config-path ${INFERENCE_WORK_PATH}/danra.datastore.yaml \ + --config-path ${INFERENCE_WORKDIR}/danra.datastore.yaml \ --output-path-format "${OUTPUT_DATASETS_ROOT_PATH}/{input_name}.zarr" \ ${OUTPUT_DATASETS_ROOT_PATH}/inference_output.zarr diff --git a/configurations/surface-dummy-model_DINI/src/create_inference_dataset.py b/configurations/surface-dummy-model_DINI/src/create_inference_dataset.py index d208d8d..aae1692 100644 --- a/configurations/surface-dummy-model_DINI/src/create_inference_dataset.py +++ b/configurations/surface-dummy-model_DINI/src/create_inference_dataset.py @@ -13,10 +13,6 @@ from neural_lam.config import DatastoreSelection, NeuralLAMConfig FP_TRAINING_CONFIG = "inference_artifact/configs/config.yaml" -# the path below describes where to save the inference datastore config, -# inference zarr dataset and the inference config for neural-lam itself -FP_INFERENCE_WORKDIR = "inference_workdir" -FP_INFERENCE_CONFIG = f"{FP_INFERENCE_WORKDIR}/config.yaml" DATASTORE_INPUT_PATH_FORMAT = "{datastore_name}.{input_name}={input_path}" @@ -68,6 +64,9 @@ def _parse_datastore_input_paths(s: str) -> Dict[str, Dict[str, str]]: # comma-separated list of time dimensions to replace, e.g. # time,forecast_reference_time "TIME_DIMENSIONS": lambda s: s.split(","), + # inference working directory, relative to where inference config and + # datasets are saved + "INFERENCE_WORKDIR": str, } @@ -246,6 +245,7 @@ def _create_inference_datastore_config( def _prepare_inference_dataset_zarr( datastore_name: str, datastore_input_paths: Dict[str, str], + fp_inference_workdir: str, analysis_time: datetime.datetime, forecast_duration: datetime.timedelta, time_dimensions: list[str], @@ -261,6 +261,9 @@ def _prepare_inference_dataset_zarr( datastore_input_paths : Dict[str, str] A dictionary of input names and paths to overwrite in the training config. + fp_inference_workdir : str + The path to the inference working directory, where the inference + datastore config(s) and zarr dataset(s) will be saved. analysis_time : datetime.datetime The analysis time to use for the inference dataset. forecast_duration : datetime.timedelta @@ -302,7 +305,7 @@ def _prepare_inference_dataset_zarr( ) fp_inference_datastore_config = ( - f"{FP_INFERENCE_WORKDIR}/{datastore_name}.datastore.yaml" + f"{fp_inference_workdir}/{datastore_name}.datastore.yaml" ) Path(fp_inference_datastore_config).parent.mkdir( @@ -318,8 +321,8 @@ def _prepare_inference_dataset_zarr( inference_config.to_yaml_file(fp_inference_datastore_config) ds = mdp.create_dataset(config=inference_config, ds_stats=ds_stats) + logger.info(f"Writing inference dataset to {fp_dataset}") ds.to_zarr(fp_dataset) - logger.info(f"Saved inference dataset to {fp_dataset}") return fp_inference_datastore_config @@ -328,6 +331,7 @@ def _prepare_all_inference_dataset_zarr( analysis_time: datetime.datetime, forecast_duration: datetime.timedelta, datastore_input_paths: Dict[str, Dict[str, str]], + fp_inference_workdir: str, time_dimensions: list[str], ) -> str: """ @@ -342,6 +346,9 @@ def _prepare_all_inference_dataset_zarr( datastore_input_paths : Dict[str, Dict[str,str]] A dictionary of datastore names and their corresponding input names and paths to overwrite in the training config. + fp_inference_workdir : str + The path to the inference working directory, where the inference + datastore config(s) and zarr dataset(s) will be saved. time_dimensions : list[str] The list of time dimensions to replace `time` with, for example replacing `time` with [`analysis_time`, `elapsed_forecast_duration`] @@ -360,6 +367,7 @@ def _prepare_all_inference_dataset_zarr( fp_training_datastore_config = _prepare_inference_dataset_zarr( datastore_name=datastore_name, datastore_input_paths=input_paths, + fp_inference_workdir=fp_inference_workdir, analysis_time=analysis_time, forecast_duration=forecast_duration, time_dimensions=time_dimensions, @@ -371,14 +379,16 @@ def _prepare_all_inference_dataset_zarr( def _create_inference_config( - fps_inference_datastore_config: Dict[str, str] + fps_inference_datastore_config: Dict[str, str], fp_inference_workdir: str ) -> str: training_config = NeuralLAMConfig.from_yaml_file(FP_TRAINING_CONFIG) inference_config = copy.deepcopy(training_config) + fp_inference_config = f"{fp_inference_workdir}/config.yaml" + def _set_datastore_config_path(node: DatastoreSelection, fp: str): node.config_path = Path(fp).relative_to( - Path(FP_INFERENCE_CONFIG).parent + Path(fp_inference_config).parent ) # XXX: There is a bug in neural-lam here that means that the datastore kind # doesn't correctly get serialised to a string in the config file when @@ -409,10 +419,10 @@ def _set_datastore_config_path(node: DatastoreSelection, fp: str): node=inference_config.datastore, fp=fp_datastore_config ) - inference_config.to_yaml_file(FP_INFERENCE_CONFIG) - logger.info(f"Saved inference config to {FP_INFERENCE_CONFIG}") + inference_config.to_yaml_file(fp_inference_config) + logger.info(f"Saved inference config to {fp_inference_config}") - return FP_INFERENCE_CONFIG + return fp_inference_config @logger.catch(reraise=True) @@ -429,10 +439,12 @@ def main(): analysis_time=analysis_time, forecast_duration=env_vars["FORECAST_DURATION"], datastore_input_paths=env_vars["DATASTORE_INPUT_PATHS"], + fp_inference_workdir=env_vars["INFERENCE_WORKDIR"], time_dimensions=env_vars["TIME_DIMENSIONS"], ) _create_inference_config( - fps_inference_datastore_config=fps_inference_datastore_config + fps_inference_datastore_config=fps_inference_datastore_config, + fp_inference_workdir=env_vars["INFERENCE_WORKDIR"], ) From 47904d426b2931d314bd44262f9ffb26f44de368 Mon Sep 17 00:00:00 2001 From: Leif Denby Date: Tue, 30 Sep 2025 15:40:26 +0200 Subject: [PATCH 17/19] use single gpu during inference --- configurations/surface-dummy-model_DINI/entry.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/configurations/surface-dummy-model_DINI/entry.sh b/configurations/surface-dummy-model_DINI/entry.sh index a0cd0f7..931bf22 100755 --- a/configurations/surface-dummy-model_DINI/entry.sh +++ b/configurations/surface-dummy-model_DINI/entry.sh @@ -98,6 +98,7 @@ uv run python -m neural_lam.create_graph --config_path ${INFERENCE_WORKDIR}/conf ## 3. Run inference uv run python -m neural_lam.train_model --config_path ${INFERENCE_WORKDIR}/config.yaml \ --eval test\ + --devices 1\ # parallel write of zarr over multiple GPUs not implemented yet --graph ${GRAPH_NAME} \ --hidden_dim ${NUM_HIDDEN_DIMS} \ --ar_steps_eval ${NUM_EVAL_STEPS} \ From 88353704302cb9c6d9457a51e675948d712d184d Mon Sep 17 00:00:00 2001 From: Leif Denby Date: Tue, 30 Sep 2025 15:45:25 +0200 Subject: [PATCH 18/19] no inline comments in multiline bash commands --- configurations/surface-dummy-model_DINI/entry.sh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/configurations/surface-dummy-model_DINI/entry.sh b/configurations/surface-dummy-model_DINI/entry.sh index 931bf22..b38e6c9 100755 --- a/configurations/surface-dummy-model_DINI/entry.sh +++ b/configurations/surface-dummy-model_DINI/entry.sh @@ -96,9 +96,10 @@ uv run python -m neural_lam.create_graph --config_path ${INFERENCE_WORKDIR}/conf --name ${GRAPH_NAME} ${CREATE_GRAPH_ARG} ## 3. Run inference +# NB: parallel write of zarr over multiple GPUs not implemented yet, so can ony use one gpu for now uv run python -m neural_lam.train_model --config_path ${INFERENCE_WORKDIR}/config.yaml \ --eval test\ - --devices 1\ # parallel write of zarr over multiple GPUs not implemented yet + --devices 1\ --graph ${GRAPH_NAME} \ --hidden_dim ${NUM_HIDDEN_DIMS} \ --ar_steps_eval ${NUM_EVAL_STEPS} \ From 1755b690cf73ede1de5ab2be1fb10d24f197e0ad Mon Sep 17 00:00:00 2001 From: Leif Denby Date: Tue, 27 Jan 2026 13:05:12 +0100 Subject: [PATCH 19/19] commit missing files --- .../surface-dummy-model_DINI/README.md | 12 ++++++ .../surface-dummy-model_DINI/entry.sh | 12 +++--- .../run_inference_with_dini.sh | 38 +++++++++++++++++++ 3 files changed, 57 insertions(+), 5 deletions(-) create mode 100644 configurations/surface-dummy-model_DINI/run_inference_with_dini.sh diff --git a/configurations/surface-dummy-model_DINI/README.md b/configurations/surface-dummy-model_DINI/README.md index 16a0b0d..3582644 100644 --- a/configurations/surface-dummy-model_DINI/README.md +++ b/configurations/surface-dummy-model_DINI/README.md @@ -5,6 +5,18 @@ surface variables from DANRA, only 10 days of data and only trained 10 epochs. It is intended only as a demonstration of the inference pipeline and is expected to give very poor results. +## Building image and running inference + +To build the image on "superjuice" (`27sj894.dmi.dk`) we need to set the AWS tokens to read the inference artifact and also use the local http proxy for pulling the base image: + +```bash +export AWS_SECRET_ACCESS_KEY= +export AWS_ACCESS_KEY_ID= +export MLWM_PULL_PROXY=http://squid1.dmi.dk:3128 +``` + + + ## Upstream package change requirements Relative to the `main` branch on both github.com/mllam/mllam-data-prep and diff --git a/configurations/surface-dummy-model_DINI/entry.sh b/configurations/surface-dummy-model_DINI/entry.sh index b38e6c9..c8a2037 100755 --- a/configurations/surface-dummy-model_DINI/entry.sh +++ b/configurations/surface-dummy-model_DINI/entry.sh @@ -33,6 +33,12 @@ if [ -f .env ] ; then set -a && source .env && set +a fi +## Model specific inference configuration (same across all executions) +NUM_HIDDEN_DIMS=2 +GRAPH_NAME="multiscale" +HIEARCHICAL_GRAPH=false +MODEL_TIMESTEP="PT3H" # model trained on 3-hourly data + # set default override of input paths in the datastore config used for creating the # inference dataset if environment variable isn't set DATASTORE_INPUT_PATHS=${DATASTORE_INPUT_PATHS:-"\ @@ -54,11 +60,7 @@ echo " FORECAST_DURATION=${FORECAST_DURATION}" echo " NUM_EVAL_STEPS=${NUM_EVAL_STEPS}" echo " INFERENCE_WORKDIR=${INFERENCE_WORKDIR}" -## Model specific inference configuration (same across all executions) -NUM_HIDDEN_DIMS=2 -GRAPH_NAME="multiscale" -HIEARCHICAL_GRAPH=false - +# set cli argument for creating hierarchical graph if needed if [ "$HIEARCHICAL_GRAPH" = true ] ; then CREATE_GRAPH_ARG="--hierarchical" else diff --git a/configurations/surface-dummy-model_DINI/run_inference_with_dini.sh b/configurations/surface-dummy-model_DINI/run_inference_with_dini.sh new file mode 100644 index 0000000..8dd18cc --- /dev/null +++ b/configurations/surface-dummy-model_DINI/run_inference_with_dini.sh @@ -0,0 +1,38 @@ +#!/bin/bash + +# This script runs the inference container using initial conditions from DINI +# stored on AWS + +# The script takes only one argument: the analysis time to use for +# inference, in ISO8601 format (e.g. 2025-11-05T090000Z). + +if [ "$#" -ne 1 ]; then + echo "Usage: $0 " + exit 1 +fi +ANALYSIS_TIME="$1" + +DINI_ZARR="s3://harmonie-zarr/dini/control/${ANALYSIS_TIME}/single_levels.zarr/" +DATASTORE_INPUT_PATHS="danra.danra_surface=${DINI_ZARR},danra.danra_static=${DINI_ZARR}" + +ANALYSIS_TIME=${ANALYSIS_TIME} +DATASTORE_INPUT_PATHS=${DATASTORE_INPUT_PATHS} +TIME_DIMENSIONS=time + +podman run --rm \ + --device /dev/nvidia0 \ + --device /dev/nvidiactl \ + --device /dev/nvidia-uvm \ + --device /dev/nvidia-uvm-tools \ + --device /dev/nvidia-modeset \ + -v /lib/x86_64-linux-gnu/libcuda.so.1:/lib/x86_64-linux-gnu/libcuda.so.1:ro \ + -v /lib/x86_64-linux-gnu/libnvidia-ml.so.1:/lib/x86_64-linux-gnu/libnvidia-ml.so.1:ro \ + -v /lib/x86_64-linux-gnu/libnvidia-ptxjitcompiler.so.1:/lib/x86_64-linux-gnu/libnvidia-ptxjitcompiler.so.1:ro \ + --shm-size=32g \ + -v ./inference_workdir/:/workspace/inference_workdir/ \ + -e DATASTORE_INPUT_PATHS="${DATASTORE_INPUT_PATHS}" \ + -e TIME_DIMENSIONS="${TIME_DIMENSIONS}" \ + -e ANALYSIS_TIME="${ANALYSIS_TIME}" \ + -e FORECAST_DURATION="PT18H" \ + -e NUM_EVAL_STEPS=6 \ + localhost/surface-dummy-model_dini:latest