diff --git a/ESMF_Mesh_Domain_Configuration_Production/NextGen_hyfab_to_ESMF_Mesh.py b/ESMF_Mesh_Domain_Configuration_Production/NextGen_hyfab_to_ESMF_Mesh.py index b06402b5..dc4bf59e 100644 --- a/ESMF_Mesh_Domain_Configuration_Production/NextGen_hyfab_to_ESMF_Mesh.py +++ b/ESMF_Mesh_Domain_Configuration_Production/NextGen_hyfab_to_ESMF_Mesh.py @@ -41,7 +41,7 @@ def convert_hyfab_to_esmf(hyfab_gpkg: pathlib.Path, esmf_mesh_output: pathlib.Pa # Eventually, we'll add code to slice catchment ids # but for now just use feature ids - element_ids = np.array(np.array([elem.split('-')[1] for elem in np.array(hyfab.divide_id.values, dtype=str)], dtype=float), dtype=int) + element_ids = np.array(np.array([elem for elem in np.array(hyfab.div_id.values, dtype=str)], dtype=float), dtype=int) hyfab_coords = np.empty((len(element_ids), 2), dtype=float) hyfab_coords[:, 0] = element_ids hyfab_coords[:, 1] = element_ids diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/bmi_model.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/bmi_model.py index c1b2f692..50f46463 100755 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/bmi_model.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/bmi_model.py @@ -216,16 +216,16 @@ def initialize(self, config_file: str, output_path: str | None = None) -> None: self._job_meta = ConfigOptions(self.cfg_bmi) # Parse the configuration options - try: - self._job_meta.validate_config(self.cfg_bmi) - except KeyboardInterrupt as e: - err_handler.err_out_screen("User keyboard interrupt", e) - except ImportError as e: - err_handler.err_out_screen("Missing Python packages", e) - except InterruptedError as e: - err_handler.err_out_screen("External kill signal detected", e) - except Exception as e: - err_handler.err_out_screen("Unhandled exception", e) + # try: + # self._job_meta.validate_config(self.cfg_bmi) + # except KeyboardInterrupt as e: + # err_handler.err_out_screen("User keyboard interrupt", e) + # except ImportError as e: + # err_handler.err_out_screen("Missing Python packages", e) + # except InterruptedError as e: + # err_handler.err_out_screen("External kill signal detected", e) + # except Exception as e: + # err_handler.err_out_screen("Unhandled exception", e) # Set NWM version and config, if provided in the config if self.cfg_bmi.get("NWM_VERSION") is not None: @@ -389,8 +389,24 @@ def initialize_with_params( :param output_path: The output path for model results. If omitted, a default path will be generated. :raises ValueError: If an invalid grid type is specified, an exception is raised. """ + # This is required prior to the first log message. + LOG.bind() + + bmi_cfg_file = Path(config_file).resolve() + if not bmi_cfg_file.is_file(): + LOG.critical(f"Config file {bmi_cfg_file} not found, nothing to do...") + raise RuntimeError( + f"Config file {bmi_cfg_file} not found, nothing to do..." + ) + + LOG.info(f"Reading config file: {bmi_cfg_file}") + with bmi_cfg_file.open("r") as fp: + cfg = yaml.safe_load(fp) + + self.cfg_bmi = parse_config(cfg) # Set the job metadata parameters (b_date, geogrid) using config_options - self._job_meta = ConfigOptions(self.cfg_bmi, b_date=b_date, geogrid_arg=geogrid) + self.cfg_bmi = parse_config(cfg) + self._job_meta = ConfigOptions(self.cfg_bmi, b_date=b_date, geogrid=geogrid) # Now that _job_meta is set, call initialize() to set up the core model self.initialize(config_file, output_path=output_path) diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/config.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/config.py index a8fbeead..82f47900 100755 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/config.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/config.py @@ -1,12 +1,20 @@ +from __future__ import annotations + import configparser import json import os import re from datetime import datetime, timedelta, timezone +from typing import TYPE_CHECKING, Any import ewts import numpy as np +from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core import mpi_utils +from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.consts import ( + CONFIGOPTIONS, + FORCINGINPUTMOD, +) from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.err_handler import ( err_out_screen, ) @@ -14,125 +22,36 @@ calculate_lookback_window, ) -from . import mpi_utils - LOG = ewts.get_logger(ewts.FORCING_ID) -FORCE_COUNT = 27 class ConfigOptions: """Configuration abstract class for configuration options read in from the file specified by the user.""" - def __init__(self, config: dict, b_date=None, geogrid_arg=None): + def __init__(self, cfg_bmi: dict, b_date: str = None, geogrid: str = None) -> None: """Initialize the configuration class to empty None attributes. - param config: The user-specified path to the configuration file. + The attributes of this class are populated by the validate_config function, which reads in the configuration file and checks that all necessary options are provided and properly formatted. The attributes of this class are used to control the flow of the program and the processing of input forcings. + + Args: + cfg_bmi (dict): The configuration dictionary read in from the configuration file specified by the user. This should be read in using the config_utils.read_config function, which also handles any necessary preprocessing of the configuration file. + b_date (str, optional): The beginning date of processing in the format YYYYMMDDHHMM. This is used to calculate the processing window for realtime simulations. If not provided, it will be read from the configuration file. + geogrid (str, optional): The filepath to the geogrid file to be used for processing. This is used to specify the grid information for regridding input forcings. If not provided, it will be read from the configuration file. + """ - self.bmi_time = None - self.current_time = None - self.bmi_time_index = 0 - self.input_forcings = None - self.precip_only_flag = False - self.supp_precip_forcings = None - self.input_force_dirs = None - self.input_force_types = None - self.supp_precip_dirs = None - self.supp_precip_file_types = None - self.supp_precip_param_dir = None - self.input_force_mandatory = None - self.supp_precip_mandatory = None - self.supp_pcp_max_hours = None - self.number_inputs = None - self.number_supp_pcp = None - self.number_custom_inputs = 0 - self.output_freq = None - self.sub_output_hour = None - self.sub_output_freq = None - self.scratch_dir = None - self.useCompression = 0 - self.useFloats = 0 - self.num_output_steps = None - self.num_supp_output_steps = None - self.actual_output_steps = None - self.realtime_flag = None - self.refcst_flag = None - self.ana_flag = None + if geogrid is not None: + self.user_provided_geogrid_flag = True + else: + self.user_provided_geogrid_flag = False + self.b_date_proc = b_date - self.e_date_proc = None - self.first_fcst_cycle = None - self.current_fcst_cycle = None - self.current_output_step = None - self.cycle_length_minutes = None - self.prev_output_date = None - self.current_output_date = None - self.look_back = None - self.future_time = None - self.fcst_freq = None - self.nFcsts = None - self.fcst_shift = None - self.fcst_input_horizons = None - self.fcst_input_offsets = None - self.process_window = None - self.spatial_meta = None - self.grid_type = None - self.grid_meta = None - self.ExactExtract = None - self.lat_var = None - self.lon_var = None - self.hgt_var = None - self.cosalpha_var = None - self.sinalpha_var = None - self.slope_var = None - self.slope_azimuth_var = None - self.slope_var_elem = None - self.slope_azimuth_var_elem = None - self.nodecoords_var = None - self.elemcoords_var = None - self.elemconn_var = None - self.numelemconn_var = None - self.element_id_var = None - self.hgt_elem_var = None - self.ignored_border_widths = None - self.regrid_opt = None - self.weightsDir = None - self.regrid_opt_supp_pcp = None - self.config_path = config - self.errMsg = None - self.statusMsg = None - self.logFile = None - self.logHandle = None - self.dScaleParamDirs = None - self.paramFlagArray = None - self.forceTemoralInterp = None - self.suppTemporalInterp = None - self.t2dDownscaleOpt = None - self.swDownscaleOpt = None - self.psfcDownscaleOpt = None - self.precipDownscaleOpt = None - self.q2dDownscaleOpt = None - self.t2BiasCorrectOpt = None - self.psfcBiasCorrectOpt = None - self.q2BiasCorrectOpt = None - self.windBiasCorrect = None - self.swBiasCorrectOpt = None - self.lwBiasCorrectOpt = None - self.precipBiasCorrectOpt = None - self.runCfsNldasBiasCorrect = False - self.cfsv2EnsMember = None - self.customSuppPcpFreq = None - self.customFcstFreq = None - self.rqiMethod = None - self.rqiThresh = 1.0 + self.cfg_bmi = cfg_bmi + self.geogrid = geogrid + + self.bmi_time_index = 0 self.globalNdv = -9999.0 self.d_program_init = datetime.now(timezone.utc) self.errFlag = 0 - self.nwmVersion = None - self.nwmConfig = None - self.include_lqfrac = False - self.forcing_output = None - self.aws = None - self.aws_obj = None - self.aws_time = None self.aorc_conus_source = "s3://noaa-nws-aorc-v1-1-1km" self.aorc_conus_year_url = "{source}/{year}.zarr" self.aorc_alaska_source = "s3://ngwpc-data/AORC/Alaska" @@ -141,19 +60,233 @@ def __init__(self, config: dict, b_date=None, geogrid_arg=None): ) self.nwm_source = "s3://noaa-nwm-retrospective-3-0-pds" - self.nwm_geogrid = None - self.geogrid = geogrid_arg - self.geopackage = None + self._scratch_dir_has_been_uniquefied = False + self.supp_precip_forcings = self.extract_input_variable("SuppPcp") + if not self.precip_only_flag: + self.input_forcings = self.extract_input_variable("InputForcings") + + # Create temporary array to hold flags if we need input parameter files. + self.param_flag = np.zeros([len(self.input_forcings)], int) - self.uid64 = None + # set list of attibutes from consts.py to None. + # These are indexed from the consts dictionary using the class name + for attr in CONFIGOPTIONS[self.__class__.__name__]: + setattr(self, attr, None) self.broadcast_new_64bit_uid() - self._scratch_dir_has_been_uniquefied = False + for ( + cfg_bmi_attr, + config_options_attr, + ) in self.try_config_get_except_attr_map.items(): + setattr(self, config_options_attr, self.try_config_get(cfg_bmi_attr)) + + self.set_attrs(CONFIGOPTIONS["extract_input_variable_attrs_map"]) + + if self.precip_only_flag: + self.set_attrs( + CONFIGOPTIONS["extract_input_variable_attrs_map_precip_only"] + ) + self.set_attrs( + CONFIGOPTIONS["extract_input_variable_attrs_map_not_precip_only"], + set_none=True, + ) + else: + self.set_attrs( + CONFIGOPTIONS["extract_input_variable_attrs_map_not_precip_only"] + ) + self.set_attrs( + CONFIGOPTIONS["extract_input_variable_attrs_map_precip_only"], + set_none=True, + ) + if 27 in self.input_forcings: + self.nwm_geogrid = self.extract_input_variable("NWMGeogridIn") + + if self.perform_downscaling: + self.set_attrs(CONFIGOPTIONS["downscaling_attrs_map"]) + if self.grid_type == "unstructured": + self.set_attrs(CONFIGOPTIONS["downscaling_unstructred_attrs_map"]) + + for cfg_bmi_attr, config_options_attr in CONFIGOPTIONS[ + "extract_input_variable_set_default_attrs_map" + ].items(): + if config_options_attr in ["supp_pcp_max_hours","weightsDir"]: + default = None + else: + default = 0 + setattr( + self, + config_options_attr, + self.extract_input_variable_set_default(cfg_bmi_attr, default), + ) + + @property + def try_config_get_except_attr_map(self) -> dict: + """Get the mapping of configuration variable names to class attribute names for variables that are extracted directly from the configuration file without any additional processing. This is used to control how variables are extracted from the configuration file and assigned to class attributes in a consistent way based on the mapping specified in the consts.py file.""" + dict_map = CONFIGOPTIONS["try_config_get_except_attr_map"] + if self._b_date_proc is not None and "RefcstBDateProc" in dict_map: + dict_map.pop("RefcstBDateProc") + if self.geogrid is not None and "GeogridIn" in dict_map: + dict_map.pop("GeogridIn") + return dict_map + + @property + def cfg_bmi(self) -> dict: + """Return the configuration dictionary read in from the configuration file specified by the user.""" + return self._cfg_bmi + + @cfg_bmi.setter + def cfg_bmi(self, value: dict) -> None: + """Set the configuration dictionary read in from the configuration file specified by the user.""" + if not isinstance(value, dict): + raise TypeError( + f"Expected dict, got {type(value)} for type of cfg_bmi: {value}" + ) + self._cfg_bmi = value + + @property + def force_count(self) -> int: + """Calculate the number of total possible input forcing options based on the length of the InputForcings list in the consts.py file. This is used for error checking to ensure users specify valid input forcing options in the configuration file.""" + return len(FORCINGINPUTMOD["PRODUCT_NAME"]) + + @property + def supp_precip_count(self) -> int: + """Calculate the number of total possible supplemental precip forcing options based on the length of the SuppPrecipForcings list in the consts.py file. This is used for error checking to ensure users specify valid supplemental precip forcing options in the configuration file.""" + # TODO make this dynamic based on the length of the SUPPPRECIPMOD list in consts.py, but for now hardcoding to 15 since that is the number of options currently available in consts.py and this will avoid any issues with the formatting of the consts.py file causing errors in the program. This is used for error checking to ensure users specify valid supplemental precip forcing options in the configuration file. + # return len(SUPPPRECIPMOD["suppPrecipMod"]["PRODUCT_NAMES"]) + return 15 + + @property + def number_supp_pcp(self) -> int: + """Calculate the number of supplemental precip forcings specified by the user in the configuration file.""" + return len(self.supp_precip_forcings) + + @property + def precip_only_flag(self) -> bool: + """Flag to indicate whether the user has chosen to run the supplemental precip forcings module only, which will trigger some different processing pathways and error checking for certain configuration options.""" + precip_only = False + if self.number_supp_pcp == 1: + if int(self.supp_precip_forcings[0]) == 14: + precip_only = True + return precip_only + + def set_attrs(self, attrs_dict: dict, set_none: bool = False): + """Set the attributes of the class based on the configuration file. This is used to populate the attributes of the class after they have been read in and validated from the configuration file.""" + for cfg_bmi_attr, config_options_attr in attrs_dict.items(): + if set_none: + attr = None + else: + attr = self.extract_input_variable(cfg_bmi_attr) + setattr(self, config_options_attr, attr) + + def set_attrs_use_default(self, attrs_dict: dict): + """Set the attributes of the class based on the configuration file. Set default value to default if not found in config file.""" + for cfg_bmi_attr, config_options_attr in attrs_dict.items(): + setattr( + self, + config_options_attr, + self.extract_input_variable_set_default(cfg_bmi_attr), + ) + + def extract_input_variable(self, variable_name: str) -> str: + """Extract the variable name from the configuration file for a given variable.""" + try: + return self.cfg_bmi[variable_name] + except ValueError as e: + err_out_screen( + f"Improper {variable_name} value specified in the configuration file. Error: {e}" + ) + except (KeyError, configparser.NoOptionError) as e: + err_out_screen( + f"Unable to locate {variable_name} in the configuration file. Error: {e}" + ) + except json.decoder.JSONDecodeError as e: + err_out_screen( + f"Improper {variable_name} file option specified in configuration file. Error: {e}", + e, + ) + + def extract_input_variable_set_default(self, variable_name: str, default=0) -> str: + """Extract the variable name from the configuration file for a given variable, and set it to a default value if it is not found.""" + try: + variable = self.cfg_bmi[variable_name] + except (KeyError, configparser.NoOptionError) as e: + variable = default + except ValueError as e: + err_out_screen( + f"Improper {variable_name} value: {self.cfg_bmi[variable_name]}", e + ) + if default == 0: + if variable not in [0, 1]: + err_out_screen(f"Please choose a {variable_name} value of 0 or 1.") + return variable + + def try_config_get(self, variable_name: str) -> str: + """Try to get a variable from the configuration file, and return a default value if it is not found.""" + try: + var = self.cfg_bmi.get(variable_name) + if var is None: + err_out_screen( + f"Unable to locate {variable_name} in the configuration file." + ) + return var + except (KeyError, configparser.NoOptionError) as e: + err_out_screen( + f"Unable to locate {variable_name} in the configuration file.", e + ) + + def check_number_of_inputs( + self, value: list, variable_name: str, input_type: str, number_inputs: int + ) -> None: + """Check that the number of inputs specified by the user in the configuration file matches the expected number of inputs for a given variable.""" + if len(value) != number_inputs: + err_out_screen( + f"Number of {variable_name} values must match the number of {input_type} in the configuration file." + ) + + def check_number_of_inputs_forcings(self, value: list, variable_name: str) -> None: + """Check that the number of inputs specified by the user in the configuration file matches the expected number of inputs for a given variable, specifically for input forcings variables which should match the number of input forcing options specified by the user in the configuration file.""" + return self.check_number_of_inputs( + value, variable_name, " InputForcings", self.number_inputs + ) + + def check_number_of_inputs_supp_pcp(self, value: list, variable_name: str) -> None: + """Check that the number of inputs specified by the user in the configuration file matches the expected number of inputs for a given variable, specifically for supplemental precip forcing variables which should match the number of supplemental precip forcing options specified by the user in the configuration file.""" + return self.check_number_of_inputs( + value, variable_name, " SupplementalPrecipForcings", self.number_supp_pcp + ) + + def check_input_values_in_range( + self, value: list, variable_name: str, valid_input_options: list + ) -> None: + """Check that the input values specified by the user in the configuration file are within a valid range for a given variable.""" + for val in value: + if val not in valid_input_options: + err_out_screen( + f"Invalid {variable_name} value '{val}' specified in configuration file. Please specify valid values: {valid_input_options}." + ) + + def check_input_values_non_negative(self, value: list, variable_name: str) -> None: + """Check that the input values specified by the user in the configuration file are positive for a given variable.""" + for val in value: + if float(val) < 0: + err_out_screen( + f"Invalid {variable_name} value '{val}' specified in configuration file. Please specify values greater than or equal to zero." + ) + + def check_input_values_positive(self, value: list, variable_name: str) -> None: + """Check that the input values specified by the user in the configuration file are positive for a given variable.""" + for val in value: + if val <= 0: + err_out_screen( + f"Invalid {variable_name} value '{val}' specified in configuration file. Please specify values greater than zero." + ) def uniquefy_scratch_dir_as_child(self, uid: str) -> None: """Modify the existing scratch dir by adding the UID string available to all ranks from the MpiConfig class. + This may only be called once. Subsequent calls will result in an error. - This must be called by all ranks, once.""" + This must be called by all ranks, once. + """ LOG.debug(f"Uniquefying scratch dir: adding suffix {uid} to {self.scratch_dir}") if not isinstance(uid, str): raise TypeError(f"Expected str, got {type(uid)} for type of uid: {uid}") @@ -165,222 +298,405 @@ def uniquefy_scratch_dir_as_child(self, uid: str) -> None: ) self.scratch_dir = os.path.join(self.scratch_dir, uid) self._scratch_dir_has_been_uniquefied = True - self.make_scratch_dir() - def make_scratch_dir(self) -> None: + def make_scratch_dir(self, scratch_dir: str) -> None: """Make the scratch dir and its parents.""" - os.makedirs(self.scratch_dir, exist_ok=True) - LOG.debug(f"Scratch dir: {self.scratch_dir}") + os.makedirs(scratch_dir, exist_ok=True) + LOG.debug(f"Scratch dir: {scratch_dir}") - def broadcast_new_64bit_uid(self): + def broadcast_new_64bit_uid(self) -> None: """Broadcast a random uint64 then save the hash of that to self.uid64, which effectively broadcasts the same unique string to all ranks. - Should be called once to avoid confusion.""" + + Should be called once to avoid confusion. + """ if self.uid64 is not None: raise RuntimeError("self.uid64 has already been initialized.") self.uid64 = mpi_utils.get_new_broadcasted_uid() - def validate_config(self, cfg_bmi: dict) -> None: - """Validate in options from the configuration file and check that proper options were provided.""" - # Ensure b_date_proc is set; if not, read from the configuration file - if self.b_date_proc is None: - try: - self.b_date_proc = cfg_bmi.get( - "RefcstBDateProc", None - ) # Default to None if not found - if self.b_date_proc is None: - err_out_screen( - "Unable to locate RefcstBDateProc under Logistics section in configuration file." - ) - except KeyError as e: - err_out_screen( - "Unable to locate RefcstBDateProc under Logistics section in configuration file.", - e, - ) + @property + def supp_precip_forcings(self): + """Choose a set of supplemental precipitation file(s) to layer into the final LDASIN forcing files processed from the options above. The following is a mapping of numeric values to external input native forcing files. + + 1. MRMS GRIB2 hourly radar-only QPE + 2. MRMS GRIB2 hourly gage-corrected radar QPE + 3. WRF-ARW 2.5 km 48-hr Hawaii nest precipitation. + 4. WRF-ARW 2.5 km 48-hr Puerto Rico nest precipitation. + 5. CONUS MRMS GRIB2 hourly MultiSensor QPE (Pass 2 or Pass 1) + 6. Hawaii MRMS GRIB2 hourly MultiSensor QPE (Pass 2 or Pass 1) + 7. MRMS SBCv2 Liquid Water Fraction (netCDF only) + 8. NBM Conus MR + 9. NBM Alaska MR + 10. Alaska MRMS (no liquid water fraction) + 11. Alaska Stage IV NWS Precip + 12. CONUS Stage IV NWS Precip + 13. MRMS PrecipFlag precipitation classification file + 14. Custom Frequency Supplementary Precipitation product (sub-hourly precip) + 15. NBM Puerto Rico + 16. NBM Hawaii + - Example- SuppPcp: [1, 5, 13] + """ + return self._supp_precip_forcings + + @supp_precip_forcings.setter + def supp_precip_forcings(self, value: list) -> None: + """Set the list of supplemental precip forcing options specified by the user in the configuration file. This is used to control which supplemental precip forcings are processed and how they are processed based on the other configuration options specified for each supplemental precip forcing.""" + if len(value) > 0: + self.check_input_values_in_range( + [int(i) for i in value], + "SuppPcp", + list(range(1, self.supp_precip_count + 1)), + ) + self._supp_precip_forcings = value - # Ensure geopackage is set; if not, read from the configuration file - if self.geopackage is None: - try: - self.geopackage = cfg_bmi.get( - "Geopackage", None - ) # Default to None if not found - if self.geopackage is None: - err_out_screen( - "Unable to locate Geopackage in the configuration file." - ) - except KeyError as e: - err_out_screen( - "Unable to locate Geopackage in the configuration file.", e - ) + @property + def output_freq(self) -> int: + """Get the output frequency in minutes specified by the user in the configuration file. This is used to control the output frequency of the processed forcings, and is necessary for both realtime and reforecast simulations.""" + return self._output_freq - # Ensure geogrid is set; if not, read from the configuration file - if self.geogrid is None: - try: - geogrid_base = cfg_bmi.get( - "GeogridIn", None - ) # Default to None if not found - except KeyError as e: - err_out_screen( - "Unable to locate GeogridIn in the configuration file.", e - ) - if geogrid_base is None: - err_out_screen("Unable to locate GeogridIn in the configuration file.") - self.geogrid = None - else: - geogrid_parent = os.path.dirname(geogrid_base) - geogrid_filename = os.path.basename(geogrid_base) - if self.uid64 is None: - raise ValueError("self.uid64 cannot be None, please initialize it.") - self.geogrid = os.path.join( - geogrid_parent, f"{self.uid64}_{geogrid_filename}" - ) - # Create directory for esmf_mesh file - if not os.path.isdir(geogrid_parent): - try: - os.makedirs(geogrid_parent, exist_ok=True) - LOG.debug(f"Created esmf mesh directory: {geogrid_parent}") - except OSError as e: - err_out_screen( - f"Unable to create esmf_mesh directory: {geogrid_parent}. Error: {e}" - ) + @output_freq.setter + def output_freq(self, value: int) -> None: + """Specify the output frequency in minutes. Note that any frequencies at higher intervals than what if provided as input will entail input forcing data being temporally interpolated. - # Read in the base input forcing options as an array of values to map. - try: - self.supp_precip_forcings = cfg_bmi["SuppPcp"] - except KeyError as e: + Example- OutputFrequency: 60 + """ + self.check_input_values_non_negative([value], "OutputFrequency") + self._output_freq = value + + @property + def sub_output_hour(self) -> int: + """Get the sub-daily output hour specified by the user in the configuration file. This is used to control the output frequency of the processed forcings for sub-daily output frequencies, and is only necessary if the user has chosen a sub-daily output frequency in the configuration file.""" + return self._sub_output_hour + + @sub_output_hour.setter + def sub_output_hour(self, value: int) -> None: + """Sub output hour. + + New variable currently for NWMv3.1 operations to properly ingest GFS 13km forecast data that outputs various frequencies throughout the forecast cycle lifetime. This variable will properly account for reading time slices of the forecast cycle. Currently only needed for GFS 13km operational configuration. Otherwise, set this value to 0. + + Example- SubOutputHour: 0 + """ + self.check_input_values_non_negative([value], "SubOutputHour") + if value == 0: + value = None + self._sub_output_hour = value + + @property + def sub_output_freq(self) -> int: + """Calculate the sub-daily output frequency in minutes based on the output frequency and sub-daily output hour specified by the user in the configuration file. This is used to control the output frequency of the processed forcings for sub-daily output frequencies, and is only necessary if the user has chosen a sub-daily output frequency in the configuration file.""" + return self._sub_output_freq + + @sub_output_freq.setter + def sub_output_freq(self, value: int) -> None: + """Sub output frequency. + + New variable currently for NWMv3.1 operations to properly ingest GFS 13km forecast data that outputs various frequencies throughout the forecast cycle lifetime. This variable will properly account for reading time slices of the forecast cycle. Currently only needed for GFS 13km operational configuration. Otherwise, set this value to 0. + + Example- SubOutputFreq: 0 + """ + if value < 0: err_out_screen( - "Unable to locate SuppPcp under SuppForcing section in configuration file.", - e, + "Please specify an SubOutFreq that is greater than zero minutes." ) - except configparser.NoOptionError as e: + if value == 0: + value = None + self._sub_output_freq = value + + @property + def scratch_dir(self) -> str: + """Specify a scratch directory that will be used for storage of temporary files. These files will be removed automatically by the program. at the end of the BMI instance. However, this directory will also store the output forcing file if requested by the user as well (will not be deleted in this instance). + + Example- ScratchDir: "./ScratchDir + """ + return self._scratch_dir + + @scratch_dir.setter + def scratch_dir(self, value: str) -> None: + """Set the pathway to the scratch directory specified by the user in the configuration file. This is used to control where intermediate files are written during processing, and is necessary for both realtime and reforecast simulations.""" + self.make_scratch_dir(value) + self._scratch_dir = value + + @property + def useCompression(self) -> int: + """Flag to activate scale_factor / add_offset byte packing in the output files. 0 - Deactivate compression 1 - Activate compression, Only applicable in this instance when you request a netcdf output forcing file (Output: 1). Otherwise, just set to 0. + + Example- compressOutput: 0 + """ + return self._useCompression + + @useCompression.setter + def useCompression(self, value: int) -> None: + """Set the flag for whether to use compression when writing output files specified by the user in the configuration file. This is used to control whether output files are compressed, which can save disk space but may increase processing time.""" + if value is None: + value = 0 + self.check_input_values_in_range([value], "compressOutput", [0, 1]) + self._useCompression = value + + @property + def ana_flag(self) -> int: + """If this is AnA run, set AnAFlag to 1, otherwise 0. Setting this flag will change the behavior of some Bias Correction routines as the ForecastInputOffsets options. + + Example- AnAFlag: 1 + """ + return self._ana_flag + + @ana_flag.setter + def ana_flag(self, value: int) -> None: + """Set the flag for whether to include the analysis time step in the output files specified by the user in the configuration file. This is used to control whether the analysis time step is included in the output files, which can be useful for certain applications but may not be necessary for all users.""" + value = int(value) + self.check_input_values_in_range([value], "AnAFlag", [0, 1]) + self._ana_flag = value + + @property + def look_back(self) -> int: + """Specify a lookback period in minutes to process data. This is required if you are only processing an AnA operational configuration. This value should specify how far back you need to look in time from your "RefcstBDateProc" start date that you specified. In this instance, that start date will be your actual end date. If no LookBack specified, please specify -9999. + + Example- LookBack: 180 + """ + return self._look_back + + @look_back.setter + def look_back(self, value: int) -> None: + """Set the look back window in hours specified by the user in the configuration file. This is used to calculate the processing window for reforecast simulations, and is only necessary if the user is running a reforecast simulation with a specified processing window rather than a realtime simulation.""" + if value <= 0 and value != -9999: + err_out_screen("Please specify a positive LookBack or -9999 for realtime.") + if value != -9999: + calculate_lookback_window(self) + self._look_back = value + + @property + def fcst_freq(self) -> int: + """Specify a forecast frequency in minutes. This value specifies how often to generate a set of forecast forcings. If generating hourly retrospective forcings, specify this value to be 60. + + Example- ForecastFrequency: 60 + """ + return self._fcst_freq + + @fcst_freq.setter + def fcst_freq(self, value: int) -> None: + """Set the forecast frequency in hours specified by the user in the configuration file. This is used to calculate the processing window for reforecast simulations, and is only necessary if the user is running a reforecast simulation with a specified processing window rather than a realtime simulation.""" + self.check_input_values_non_negative([value], "ForecastFrequency") + if value > 1440: err_out_screen( - "Unable to locate SuppPcp under SuppForcing section in configuration file.", - e, + "Only forecast cycles of daily or sub-daily are supported at this time" ) - except json.decoder.JSONDecodeError as e: - err_out_screen("Improper SuppPcp option specified in configuration file", e) + self._fcst_freq = value - self.number_supp_pcp = len(self.supp_precip_forcings) + @property + def spatial_meta(self): + """Specify the optional land spatial metadata file. If found, coordinate projection information and coordinate will be translated from to the final output file. This variable is only a special case if the user is specifying the original WRF-Hydro domain from earlier NWM versions. Otherwise, just leave the one blank (''). - if self.number_supp_pcp == 1: - if int(self.supp_precip_forcings[0]) == 14: - self.precip_only_flag = True + Example- SpatialMetaIn: ./GEOGRID_LDASOUT_Spatial_Metadata_CONUS.nc + """ + return self._spatial_meta - if not self.precip_only_flag: - # Read in the base input forcing options as an array of values to map. - try: - self.input_forcings = cfg_bmi["InputForcings"] - except KeyError as e: - err_out_screen( - "Unable to locate InputForcings under Input section in configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate InputForcings under Input section in configuration file.", - e, - ) - except json.decoder.JSONDecodeError as e: - err_out_screen( - "Improper InputForcings option specified in configuration file", e - ) - if len(self.input_forcings) == 0: + @spatial_meta.setter + def spatial_meta(self, value: str) -> None: + """Set the spatial metadata options specified by the user in the configuration file. This is used to control how spatial metadata is handled during processing, and is necessary for both realtime and reforecast simulations.""" + if len(value) == 0: + # No spatial metadata file found. + value = None + else: + if not os.path.isfile(value): err_out_screen( - "Please choose at least one InputForcings dataset to process" + f"Unable to locate optional spatial metadata file: {value}." ) - self.number_inputs = len(self.input_forcings) + self._spatial_meta = value - # Check to make sure forcing options make sense - for force_opt in self.input_forcings: - if force_opt < 0 or force_opt > FORCE_COUNT: - err_out_screen( - f"Please specify InputForcings values between 1 and {FORCE_COUNT}." - ) + @property + def b_date_proc(self) -> str: + """If running an operational configuration in realtime or just using a retrospective dataset (NWM, AORC, ERA5), this will be the defined start date for the NextGen Forcing Engine BMI which is assumed to be the beginning of the forecast cycle (i.e. hour 0) or just the start date of the retrospective dataset. From there the first time step will be hour 1 from the start date specified here. If you're running an AnA configuration however, this variable becomes the end date of the simulation and the "LookBack" value specified above will be how far back you look in time for the AnA operational configuration. - # Keep tabs on how many custom input forcings we have. - if force_opt == 10: - self.number_custom_inputs = self.number_custom_inputs + 1 - - # Flag to force mandatory configuration option to specify the NWM geogrid file if user requests - # NWM forcing files to be regridded to a given domain configuration - if force_opt == 27: - try: - self.nwm_geogrid = cfg_bmi["NWM_Geogrid"] - except KeyError as e: - err_out_screen( - "Unable to locate NWM Geogrid file required for the NWM forcings module. Need to specify the pathway to the NWM geo_em_DOMAIN.nc file to the NWM_Geogrid configuration input option within the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate NWM Geogrid file required for the NWM forcings module. Need to specify the pathway to the NWM geo_em_DOMAIN.nc file to the NWM_Geogrid configuration input option within the configuration file.", - e, - ) - except json.decoder.JSONDecodeError as e: - err_out_screen( - "Improper NWM Geogrid file option specified in configuration file", - e, - ) + Example- RefcstBDateProc: 202210071400 + """ + return self._b_date_proc - # Read in the input forcings types (GRIB[1|2], NETCDF) - try: - # self.input_force_types = config.get('Input', 'InputForcingTypes').strip("[]").split(',') - # self.input_force_types = [ftype.strip() for ftype in self.input_force_types] - self.input_force_types = cfg_bmi["InputForcingTypes"] - if self.input_force_types == [""]: - self.input_force_types = [] - except KeyError as e: + @b_date_proc.setter + def b_date_proc(self, value: str | datetime) -> None: + """Set the beginning date of processing for reforecast simulations. This is used to calculate the processing window for reforecast simulations.""" + if isinstance(value, datetime): + self._b_date_proc = value + if value != -9999: + if isinstance(value, str) and len(value) != 12: err_out_screen( - "Unable to locate InputForcingTypes in Input section in the configuration file.", - e, + "Improper RefcstBDateProc length entered into the configuration file. Please check your entry." ) - except configparser.NoOptionError as e: + try: + self._b_date_proc = datetime.strptime(value, "%Y%m%d%H%M") + except ValueError as e: err_out_screen( - "Unable to locate InputForcingTypes in Input section in the configuration file.", + "Improper RefcstBDateProc value entered into the configuration file. Please check your entry.", e, ) - if len(self.input_force_types) != self.number_inputs: - err_out_screen( - "Number of InputForcingTypes must match the number " - "of InputForcings in the configuration file." - ) - for file_type in self.input_force_types: - if file_type not in [ - "GRIB1", - "GRIB2", - "NETCDF", - "NETCDF4", - "NWM", - "ZARR", - ]: - err_out_screen( - f'Invalid forcing file type "{file_type}" specified. ' - "Only GRIB1, GRIB2, NETCDF, NWM, and ZARR are supported" - ) + else: + self._b_date_proc = -9999 + LOG.info(f"Begin date: {value}") + + @property + def realtime_flag(self) -> bool: + """Flag to indicate whether the user has chosen to run a realtime simulation, which will trigger some different processing pathways and error checking for certain configuration options, and will also control how the processing window is calculated.""" + if self.look_back == -9999: + value = False + elif self.b_date_proc == -9999: + value = True + else: + value = False + # Calculate the beginning/ending processing dates if we are running realtime + if value: + calculate_lookback_window(self) + return value + + @property + def refcst_flag(self) -> bool: + """Flag to indicate whether the user has chosen to run a reforecast simulation, which will trigger some different processing pathways and error checking for certain configuration options, and will also control how the processing window is calculated.""" + if self.look_back == -9999: + return True + elif self.b_date_proc == -9999: + return True + else: + return False + + @property + def geopackage(self) -> str: + """Get the pathway to the geopackage file to be used for processing. This is used to specify the grid information for regridding input forcings, and is only necessary if the user is running a simulation that requires regridding of input forcings.""" + return self._geopackage + + @geopackage.setter + def geopackage(self, value: str) -> None: + """Set the pathway to the geopackage file to be used for processing. This is used to specify the grid information for regridding input forcings, and is only necessary if the user is running a simulation that requires regridding of input forcings.""" + self._geopackage = value + + @property + def geogrid(self) -> str: + """Specify a geogrid file (e.g. latitude, longitude, mesh connectivity, elevation, slope) that defines domain to which the forcings are being processed to. + + Example- GeogridIn: ./geo_em_CONUS.nc + """ + return self._geogrid + + @geogrid.setter + def geogrid(self, value: str) -> None: + """Set the pathway to the geogrid file to be used for processing. This is used to specify the grid information for regridding input forcings, and is only necessary if the user is running a simulation that requires regridding of input forcings.""" + if self.user_provided_geogrid_flag: + self._geogrid = value + if value is None: + self._geogrid = value + # err_out_screen("Unable to locate GeogridIn in the configuration file.") + else: + geogrid_parent = os.path.dirname(value) + geogrid_filename = os.path.basename(value) + if self.uid64 is None: + raise ValueError("self.uid64 cannot be None, please initialize it.") + self._geogrid = os.path.join( + geogrid_parent, f"{self.uid64}_{geogrid_filename}" + ) + self.try_make_dir(geogrid_parent, " esmf_mesh") - # Read in the input directories for each forcing option. + def try_make_dir(self, directory: str, optional_str: str = "") -> None: + """Try to make a directory, and catch any errors.""" + if not os.path.isdir(directory): try: - self.input_force_dirs = cfg_bmi["InputForcingDirectories"] - except KeyError as e: - err_out_screen( - "Unable to locate InputForcingDirectories in Input section in the configuration file.", - e, - ) - except configparser.NoOptionError as e: + os.makedirs(directory, exist_ok=True) + LOG.debug(f"Created{optional_str} directory: {directory}") + except OSError as e: err_out_screen( - "Unable to locate InputForcingDirectories in Input section in the configuration file.", - e, + f"Unable to create{optional_str} directory: {directory}. Error: {e}" ) - if len(self.input_force_dirs) != self.number_inputs: + + @property + def input_forcings(self) -> list: + """Get the list of input forcing options specified by the user in the configuration file. This is used to control which input forcings are processed and how they are processed based on the other configuration options specified for each input forcing.""" + return self._input_forcings + + @input_forcings.setter + def input_forcings(self, value: list) -> None: + """Set the list of input forcing options specified by the user in the configuration file. This is used to control which input forcings are processed and how they are processed based on the other configuration options specified for each input forcing.""" + if not self.precip_only_flag: + self.check_input_values_in_range( + value, "InputForcings", list(range(1, self.force_count + 1)) + ) + self._input_forcings = value + + @property + def number_inputs(self) -> int: + """Calculate the number of input forcing options specified by the user in the configuration file. This is used for error checking to ensure users specify valid input forcing options in the configuration file, and to control the flow of the program based on how many input forcings are being processed.""" + if not self.precip_only_flag: + if len(self.input_forcings) == 0: err_out_screen( - "Number of InputForcingDirectories must match the number " - "of InputForcings in the configuration file." + "Please choose at least one InputForcings dataset to process" ) + return len(self.input_forcings) + + @property + def number_custom_inputs(self) -> int: + """Calculate the number of custom input forcing options specified by the user in the configuration file. This is used to control the flow of the program based on how many custom input forcings are being processed, since custom input forcings require some different processing pathways.""" + if not self.precip_only_flag: + count = 0 + for force_opt in self.input_forcings: + if force_opt == 10: + count += 1 + return count + else: + return 0 + + @property + def nwm_geogrid(self) -> str: + """Only for the NWM v3 retorspective forcing module option (27) that requires the geo_em_NWM_DOMAIN.nc file as input for the NextGen Forcings Engine to properly setup up the ESMF grid object for the NWM forcing files since that information is not readily available in the NWM v3 retrospective forcing files.""" + return self._nwm_geogrid + + @nwm_geogrid.setter + def nwm_geogrid(self, value: str) -> None: + """Set the pathway to the NWM geogrid file specified by the user in the configuration file. This is used to specify the grid information for regridding NWM input forcings, and is only necessary if the user has chosen to regrid NWM input forcings in the configuration file.""" + if not self.precip_only_flag and 27 in self.input_forcings: + self._nwm_geogrid = value + else: + self._nwm_geogrid = None + + @property + def input_force_types(self) -> list: + """Get the list of input forcing file types specified by the user in the configuration file. This is used to control how input forcings are read in and processed based on the file type specified for each input forcing in the configuration file.""" + return self._input_force_types + + @input_force_types.setter + def input_force_types(self, value: list) -> None: + """Specify the file type for each forcing (comma separated). Valid types are GRIB1, GRIB2, NETCDF, and NETCDF4. + + Example- InputForcingTypes: [GRIB2,GRIB2]\ + """ + if not self.precip_only_flag: + if value == [""]: + value = [] + self.check_number_of_inputs_forcings(value, "InputForcingTypes") + self.check_input_values_in_range( + value, "InputForcingTypes", self.file_types + ) + self._input_force_types = value + + @property + def file_types(self): + """Get the list of input forcing file types specified by the user in the configuration file. This is used to control how input forcings are read in and processed based on the file type specified for each input forcing in the configuration file.""" + return CONFIGOPTIONS["file_types"] + + @property + def input_force_dirs(self) -> list: + """Get the list of input forcing directories specified by the user in the configuration file. This is used to control where input forcings are read in from for each input forcing specified by the user in the configuration file.""" + if self._input_force_dirs: + return self._input_force_dirs + else: + None + + @input_force_dirs.setter + def input_force_dirs(self, value: list) -> None: + """Specify the input directories for each forcing product. If a user has the ability to connect to the AWS servers and they specify configuration #12 (CONUS AORC data) or configuration #27 (NWM retrospective forcing data) then this specific configuration input can be left as a blank string (""). + + Example- InputForcingDirectories: [./GFS,./NDFD] + """ + if not self.precip_only_flag: + self.check_number_of_inputs_forcings(value, "InputForcingDirectories") # Loop through and ensure all input directories exist. Also strip out any whitespace # or new line characters. - for dir_tmp in range(0, len(self.input_force_dirs)): - self.input_force_dirs[dir_tmp] = self.input_force_dirs[dir_tmp].strip() - - dir_path = self.input_force_dirs[dir_tmp] + for dir_tmp in range(0, len(value)): + value[dir_tmp] = value[dir_tmp].strip() + dir_path = value[dir_tmp] forcing_type = self.input_forcings[dir_tmp] is_aws_forcing = forcing_type in [12, 21, 27] @@ -388,481 +704,138 @@ def validate_config(self, cfg_bmi: dict) -> None: if is_aws_forcing: self.aws = True else: - try: - os.makedirs(dir_path, exist_ok=True) - LOG.debug(f"Created missing forcing directory: {dir_path}") - except OSError as e: - err_out_screen( - f"Unable to create forcing directory: {dir_path}. Error: {e}" - ) + self.try_make_dir(dir_path, " forcing") + self._input_force_dirs = value - # Read in the mandatory enforcement options for input forcings. - try: - self.input_force_mandatory = cfg_bmi["InputMandatory"] - except KeyError as e: - err_out_screen( - "Unable to locate InputMandatory under Input section in configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate InputMandatory under Input section in configuration file.", - e, - ) - except json.decoder.JSONDecodeError as e: - err_out_screen( - "Improper InputMandatory option specified in configuration file", e - ) - - if len(self.input_force_mandatory) != self.number_inputs: - err_out_screen( - "Please specify InputMandatory values for each corresponding input " - "forcings in the configuration file." - ) - # Check to make sure enforcement options makes sense. - for enforce_opt in self.input_force_mandatory: - if enforce_opt < 0 or enforce_opt > 1: - err_out_screen( - "Invalid InputMandatory chosen in the configuration file. Please choose a value of 0 or 1 for each corresponding input forcing." - ) - - # Read in the output frequency - try: - self.output_freq = cfg_bmi["OutputFrequency"] - except ValueError as e: - err_out_screen( - "Improper OutputFrequency value specified in the configuration file." - ) - except KeyError as e: - err_out_screen( - "Unable to locate OutputFrequency in the configuration file." - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate OutputFrequency in the configuration file." - ) - if self.output_freq <= 0: - err_out_screen( - "Please specify an OutputFrequency that is greater than zero minutes." - ) - - if self.precip_only_flag: - # Read in the custom supp output frequency - try: - self.customSuppPcpFreq = int(cfg_bmi["customSuppPcpFreq"]) - except ValueError as e: - err_out_screen( - "Improper customSuppPcpFreq value specified in the configuration file.", - e, - ) - except KeyError as e: - err_out_screen( - "Unable to locate customSuppPcpFreq in the configuration file.", e - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate customSuppPcpFreq in the configuration file.", e - ) - if self.output_freq <= 0: - err_out_screen( - "Please specify an customSuppPcpFreq that is greater than zero minutes." - ) - - # Read in the sub output hour - try: - self.sub_output_hour = int(cfg_bmi["SubOutputHour"]) - except ValueError as e: - err_out_screen( - "Improper SubOutputHour value specified in the configuration file.", e - ) - except KeyError as e: - err_out_screen( - "Unable to locate SubOutputHour in the configuration file.", e - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate SubOutputHour in the configuration file.", e - ) - if self.sub_output_hour < 0: - err_out_screen( - "Please specify an SubOutputHour that is greater than zero minutes." - ) - if self.sub_output_hour == 0: - self.sub_output_hour = None - # Read in the output frequency - try: - self.sub_output_freq = int(cfg_bmi["SubOutFreq"]) - except ValueError as e: - err_out_screen( - "Improper SubOutFreq value specified in the configuration file.", e - ) - except KeyError as e: - err_out_screen("Unable to locate SubOutFreq in the configuration file.", e) - except configparser.NoOptionError as e: - err_out_screen("Unable to locate SubOutFreq in the configuration file.", e) - if self.sub_output_freq < 0: - err_out_screen( - "Please specify an SubOutFreq that is greater than zero minutes." - ) - if self.sub_output_freq == 0: - self.sub_output_freq = None - - # TODO Can this be a /tmp directory? - # Read in the scratch temporary directory, which also may contain output forcing file if requested. - try: - self.scratch_dir = cfg_bmi["ScratchDir"] - except ValueError as e: - err_out_screen( - "Improper ScratchDir specified in the configuration file.", e - ) - except KeyError as e: - err_out_screen("Unable to locate ScratchDir in the configuration file.", e) - except configparser.NoOptionError as e: - err_out_screen("Unable to locate ScratchDir in the configuration file.", e) - - self.make_scratch_dir() - - # Read in compression option - try: - self.useCompression = cfg_bmi["compressOutput"] - except KeyError as e: - err_out_screen("Unable to locate compressOut in the configuration file.", e) - except configparser.NoOptionError as e: - err_out_screen("Unable to locate compressOut in the configuration file.", e) - except ValueError as e: - err_out_screen("Improper compressOut value.", e) - if self.useCompression < 0 or self.useCompression > 1: - err_out_screen("Please choose a compressOut value of 0 or 1.") - - # Read in floating-point option - try: - self.useFloats = cfg_bmi["floatOutput"] - except KeyError as e: - # err_out_screen('Unable to locate floatOutput in the configuration file.', e) - self.useFloats = 0 - except configparser.NoOptionError as e: - # err_out_screen('Unable to locate floatOutput in the configuration file.', e) - self.useFloats = 0 - except ValueError as e: - err_out_screen( - "Improper floatOutput value: {}".format(cfg_bmi["includeLQFraq"]) - ) - if self.useFloats < 0 or self.useFloats > 1: - err_out_screen("Please choose a floatOutput value of 0 or 1.") - - # Read in lqfrac option - try: - self.include_lqfrac = cfg_bmi["includeLQFrac"] - except KeyError as e: - # err_out_screen('Unable to locate includeLQFraq in the configuration file.', e) - self.include_lqfrac = 0 - except configparser.NoOptionError as e: - # err_out_screen('Unable to locate includeLQFraq in the configuration file.', e) - self.useFinclude_lqfracloats = 0 - except ValueError as e: - err_out_screen( - "Improper includeLQFrac value: {}".format(cfg_bmi["includeLQFraq"]), e - ) - if self.include_lqfrac < 0 or self.include_lqfrac > 1: - err_out_screen("Please choose an includeLQFrac value of 0 or 1.") - - # Read in Forcing output option - try: - self.forcing_output = cfg_bmi["Output"] - except KeyError as e: - self.forcing_output = 0 - except configparser.NoOptionError as e: - self.forcing_output = 0 - except ValueError as e: - err_out_screen( - "Improper Forcing Output value: {}".format(cfg_bmi["Output"]), e - ) - if self.forcing_output < 0 or self.forcing_output > 1: - err_out_screen( - "Please choose a Forcing Output value of 0 (No output) or 1 (output)." - ) - - # Read AnA flag option - try: - # check both the Forecast section and if it's not there, the old BiasCorrection location - self.ana_flag = int(cfg_bmi["AnAFlag"]) - except KeyError as e: - err_out_screen("Unable to locate AnAFlag in the configuration file.", e) - except configparser.NoOptionError as e: - err_out_screen("Unable to locate AnAFlag in the configuration file.", e) - except ValueError as e: - err_out_screen("Improper AnAFlag value ", e) - if self.ana_flag < 0 or self.ana_flag > 1: - err_out_screen("Please choose a AnAFlag value of 0 or 1.") - - # For the NextGen Forcings Engine BMI, we are assuming a realtime or reforecast simulation. - try: - self.look_back = cfg_bmi["LookBack"] - if self.look_back <= 0 and self.look_back != -9999: - err_out_screen( - "Please specify a positive LookBack or -9999 for realtime." - ) - except ValueError as e: - err_out_screen( - "Improper LookBack value entered into the configuration file. Please check your entry.", - e, - ) - except KeyError as e: - err_out_screen( - "Unable to locate LookBack in the configuration file. Please verify entries exist.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate LookBack in the configuration file. Please verify entries exist.", - e, - ) - - # Process the beginning date of reforecast forcings to process + @property + def input_force_mandatory(self) -> list: + """Get the list of input forcing mandatory flags specified by the user in the configuration file. This is used to control whether the program should raise an error if input forcings for a given forecast cycle are not found for each input forcing specified by the user in the configuration file.""" + return self._input_force_mandatory - if self.b_date_proc: - beg_date_tmp = self.b_date_proc - e = "" - else: - try: - beg_date_tmp = cfg_bmi["RefcstBDateProc"] - except KeyError as e: - err_out_screen( - "Unable to locate RefcstBDateProc under Logistics section in configuration file.", - e, - ) - beg_date_tmp = None - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate RefcstBDateProc under Logistics section in configuration file.", - e, - ) - beg_date_tmp = None + @input_force_mandatory.setter + def input_force_mandatory(self, value: list) -> None: + """Specify whether the input forcings listed above are mandatory, or optional. This is important for layering contingencies if a product is missing, but forcing files are still desired. 0 - Not mandatory, 1 - Mandatory. NOTE!!! If no files are found for any products, code will error out indicating the final field is all missing values. - if beg_date_tmp != -9999: - if isinstance(beg_date_tmp, str) and len(beg_date_tmp) != 12: - err_out_screen( - "Improper RefcstBDateProc length entered into the configuration file. Please check your entry.", - e, - ) - try: - self.b_date_proc = datetime.strptime(beg_date_tmp, "%Y%m%d%H%M") - except ValueError as e: - err_out_screen( - "Improper RefcstBDateProc value entered into the configuration file. Please check your entry.", - e, - ) - else: - self.b_date_proc = -9999 + Example- InputMandatory: [1,1] + """ + if not self.precip_only_flag: + self.check_number_of_inputs_forcings(value, "InputMandatory") + self.check_input_values_in_range(value, "InputMandatory", [0, 1]) + self._input_force_mandatory = value - LOG.info(f"Begin date: {beg_date_tmp}") + @property + def customSuppPcpFreq(self) -> int: + """Get the custom supplemental precip output frequency specified by the user in the configuration file. This is used to control the output frequency of supplemental precip forcings if the user has chosen to run the supplemental precip forcings module only.""" + return self._customSuppPcpFreq - # If the Retro flag is off, and lookback is off, then we assume we are - # running a reforecast. - if self.look_back == -9999: - self.realtime_flag = False - self.refcst_flag = True - elif self.b_date_proc == -9999: - self.realtime_flag = True - self.refcst_flag = True + @customSuppPcpFreq.setter + def customSuppPcpFreq(self, value: int) -> None: + """Set the custom supplemental precip output frequency specified by the user in the configuration file. This is used to control the output frequency of supplemental precip forcings if the user has chosen to run the supplemental precip forcings module only.""" + if self.precip_only_flag: + self.check_input_values_non_negative([value], "customSuppPcpFreq") + self._customSuppPcpFreq = value else: - # The processing window will be calculated based on current time and the - # lookback option since this is a realtime instance. - self.realtime_flag = False - self.refcst_flag = False - # self.b_date_proc = -9999 - # self.e_date_proc = -9999 + self._customSuppPcpFreq = None - # Calculate the delta time between the beginning and ending time of processing. - # self.process_window = self.e_date_proc - self.b_date_proc + @property + def fcst_shift(self) -> int: + """Forecast cycles are determined by splitting up a day by equal ForecastFrequency interval. If there is a desire to shift the cycles to a different time step, ForecastShift will shift forecast cycles ahead by a determined set of minutes. For example, ForecastFrequency of 6 hours will produce forecasts cycles at 00, 06, 12, and 18 UTC. However, a ForecastShift of 1 hour will produce forecast cycles at 01, 07, 13, and 18 UTC. NOTE - This is only used by the realtime instance to calculate forecast cycles accordingly. Re-forecasts will use the beginning and ending dates specified in conjunction with the forecast frequency to determine forecast cycle dates. - # Read in the ForecastFrequency option. - try: - self.fcst_freq = cfg_bmi["ForecastFrequency"] - except ValueError as e: - err_out_screen( - "Improper ForecastFrequency value entered into the configuration file. Please check your entry.", - e, - ) - except KeyError as e: - err_out_screen( - "Unable to locate ForecastFrequency in the configuration file. Please verify entries exist.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate ForecastFrequency in the configuration file. Please verify entries exist.", - e, - ) - if self.fcst_freq <= 0: - err_out_screen( - "Please specify a ForecastFrequency in the configuration file greater than zero." - ) - # Currently, we only support daily or sub-daily forecasts. Any other iterations should - # be done using custom config files for each forecast cycle. - if self.fcst_freq > 1440: - err_out_screen( - "Only forecast cycles of daily or sub-daily are supported at this time" - ) + Example- ForecastShift: 0 + """ + return self._fcst_shift - # Read in the ForecastShift option. This is ONLY done for the realtime instance as - # it's used to calculate the beginning of the processing window. + @fcst_shift.setter + def fcst_shift(self, value: int) -> None: if True: # was: self.realtime_flag: - try: - self.fcst_shift = cfg_bmi["ForecastShift"] - except ValueError as e: - err_out_screen( - "Improper ForecastShift value entered into the configuration file. Please check your entry.", - e, - ) - except KeyError as e: - err_out_screen( - "Unable to locate ForecastShift in the configuration file. Please verify entries exist.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate ForecastShift in the configuration file. Please verify entries exist.", - e, - ) - if self.fcst_shift < 0: - err_out_screen( - "Please specify a ForecastShift in the configuration file greater than or equal to zero." - ) - + self.check_input_values_non_negative([value], "ForecastShift") # Calculate the beginning/ending processing dates if we are running realtime if self.realtime_flag: calculate_lookback_window(self) + self._fcst_shift = value + + # NOTE this commented out code copied from pre-refactored code on 5/6/2026 + # if self.refcst_flag: + # Calculate the number of forecasts to issue, and verify the user has chosen a + # correct divider based on the dates + # dt_tmp = self.e_date_proc - self.b_date_proc + # if (dt_tmp.days * 1440 + dt_tmp.seconds / 60.0) % self.fcst_freq != 0: + # err_out_screen('Please choose an equal divider forecast frequency for your ' + # 'specified reforecast range.') + # self.nFcsts = int((dt_tmp.days * 1440 + dt_tmp.seconds / 60.0) / self.fcst_freq) + + # Flag to constrain AORC forcing data cycle output + # for optTmp in self.input_forcings: + # if optTmp == 12: + # self.nFcsts = 1 - # if self.refcst_flag: - # Calculate the number of forecasts to issue, and verify the user has chosen a - # correct divider based on the dates - # dt_tmp = self.e_date_proc - self.b_date_proc - # if (dt_tmp.days * 1440 + dt_tmp.seconds / 60.0) % self.fcst_freq != 0: - # err_out_screen('Please choose an equal divider forecast frequency for your ' - # 'specified reforecast range.') - # self.nFcsts = int((dt_tmp.days * 1440 + dt_tmp.seconds / 60.0) / self.fcst_freq) - - # Flag to constrain AORC forcing data cycle output - # for optTmp in self.input_forcings: - # if optTmp == 12: - # self.nFcsts = 1 - self.nFcsts = 1 - - if self.look_back != -9999: - calculate_lookback_window(self) + @property + def nFcsts(self): + """Get the number of forecasts to issue for a reforecast simulation based on the forecast shift and the processing window specified by the user in the configuration file. This is used to control how many forecast time steps are output for a reforecast simulation, and is only necessary if the user is running a reforecast simulation with a specified processing window rather than a realtime simulation.""" + return self._nFcsts + + @nFcsts.setter + def nFcsts(self, value: int) -> None: + """Set the number of forecasts to issue for a reforecast simulation based on the forecast shift and the processing window specified by the user in the configuration file. This is used to control how many forecast time steps are output for a reforecast simulation, and is only necessary if the user is running a reforecast simulation with a specified processing window rather than a realtime simulation.""" + if value is None: + value = 1 + self._nFcsts = value + + @property + def fcst_input_horizons(self) -> list: + """Specify how much (in minutes) of each input forcing is desires for each forecast cycle. See documentation for examples. The length of this array must match the input forcing choices. + + - Example- ForecastInputHorizons: [60, 60] + """ + return self._fcst_input_horizons + @fcst_input_horizons.setter + def fcst_input_horizons(self, value: list) -> None: if not self.precip_only_flag: - # Read in the ForecastInputHorizons options. - try: - self.fcst_input_horizons = cfg_bmi["ForecastInputHorizons"] - except KeyError as e: - err_out_screen( - "Unable to locate ForecastInputHorizons under Forecast section in configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate ForecastInputHorizons under Forecast section in configuration file.", - e, - ) - except json.decoder.JSONDecodeError as e: - err_out_screen( - "Improper ForecastInputHorizons option specified in configuration file", - e, - ) - if len(self.fcst_input_horizons) != self.number_inputs: + self.check_number_of_inputs_forcings(value, "ForecastInputHorizons") + self.check_input_values_non_negative(value, "ForecastInputHorizons") + else: + if len(self.fcst_input_horizons) != 1: err_out_screen( "Please specify ForecastInputHorizon values for each corresponding input forcings for forecasts." ) + self._fcst_input_horizons = value - # Check to make sure the horizons options make sense. There will be additional - # checking later when input choices are mapped to input products. - for horizonOpt in self.fcst_input_horizons: - if horizonOpt <= 0: - err_out_screen( - "Please specify ForecastInputHorizon values greater than zero." - ) - else: - # Read in the ForecastInputHorizons options. - try: - self.fcst_input_horizons = cfg_bmi["ForecastInputHorizons"] - except KeyError as e: - err_out_screen( - "Unable to locate ForecastInputHorizons under Forecast section in configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate ForecastInputHorizons under Forecast section in configuration file.", - e, - ) - except json.decoder.JSONDecodeError as e: - err_out_screen( - "Improper ForecastInputHorizons option specified in configuration file", - e, - ) - if len(self.fcst_input_horizons) != 1: - err_out_screen( - "Please specify ForecastInputHorizon values for each corresponding input forcings for forecasts." - ) + @property + def fcst_input_offsets(self): + """Option for applying an offset to input forcings to use a different forecasted interval. For example, a user may wish to use 4-5 hour forecasted fields from an NWP grid from one of their input forcings. In that instance the offset would be 4 hours, but 0 for other remaining forcings. + Example- ForecastInputOffsets: [0, 0] + """ + return self._fcst_input_offsets + + @fcst_input_offsets.setter + def fcst_input_offsets(self, value: list) -> None: if not self.precip_only_flag: - # Read in the ForecastInputOffsets options. - try: - self.fcst_input_offsets = cfg_bmi["ForecastInputOffsets"] - except KeyError as e: - err_out_screen( - "Unable to locate ForecastInputOffsets under Forecast section in the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate ForecastInputOffsets under Forecast section in the configuration file.", - e, - ) - except json.decoder.JSONDecodeError as e: - err_out_screen( - "Improper ForecastInputOffsets option specified in the configuration file.", - e, - ) - if len(self.fcst_input_offsets) != self.number_inputs: - err_out_screen( - "Please specify ForecastInputOffset values for each corresponding input forcings for forecasts." - ) - # Check to make sure the input offset options make sense. There will be additional - # checking later when input choices are mapped to input products. - for inputOffset in self.fcst_input_offsets: - if inputOffset < 0: - err_out_screen( - "Please specify ForecastInputOffset values greater than or equal to zero." - ) + self.check_number_of_inputs_forcings(value, "ForecastInputOffsets") + self.check_input_values_non_negative(value, "ForecastInputOffsets") + self._fcst_input_offsets = value - # Calculate the length of the forecast cycle, based on the maximum - # length of the input forcing length chosen by the user. - self.cycle_length_minutes = max(self.fcst_input_horizons) + @property + def cycle_length_minutes(self) -> int: + """Get the forecast cycle length in minutes, which is calculated based on the maximum of the forecast input horizons specified by the user in the configuration file. - # Ensure the number maximum cycle length is an equal divider of the output - # time step specified by the user. - if self.cycle_length_minutes % self.output_freq != 0: + Ensure the number maximum cycle length is an equal divider of the output time step specified by the user. + """ + cycle_len = max(self.fcst_input_horizons) + if cycle_len % self.output_freq != 0: err_out_screen( "Please specify an output time step that is an equal divider of the maximum of the forecast time horizons specified." ) + return cycle_len + @property + def num_output_steps(self) -> int: + """Calculate the number of output time steps per forecast cycle based on the forecast cycle length and the output frequency specified by the user in the configuration file.""" if self.sub_output_hour is None: - # Calculate the number of output time steps per forecast cycle. - self.num_output_steps = int(self.cycle_length_minutes / self.output_freq) - if self.precip_only_flag: - self.num_supp_output_steps = ( - int(self.cycle_length_minutes) / self.customSuppPcpFreq - ) - if self.ana_flag: - self.actual_output_steps = np.int32(self.nFcsts) - else: - self.actual_output_steps = np.int32(self.num_output_steps) + num_steps = int(self.cycle_length_minutes / self.output_freq) else: - # Calculate the number of output time steps per forecast cycle. - self.num_output_steps = ( + num_steps = ( int( (self.cycle_length_minutes - (self.sub_output_hour * 60)) / self.sub_output_freq @@ -870,826 +843,436 @@ def validate_config(self, cfg_bmi: dict) -> None: + int((self.sub_output_hour * 60) / self.output_freq) - 1 ) - if self.precip_only_flag: - self.num_supp_output_steps = ( - int(self.cycle_length_minutes) / self.customSuppPcpFreq - ) - if self.ana_flag: - self.actual_output_steps = np.int32(self.nFcsts) - else: - self.actual_output_steps = np.int32(self.num_output_steps) + return num_steps - # Process the grid type - try: - self.grid_type = cfg_bmi["GRID_TYPE"] - except KeyError as e: - err_out_screen("Unable to locate GRID_TYPE in the configuration file.", e) - except configparser.NoOptionError as e: - err_out_screen("Unable to locate GRID_TYPE in the configuration file.", e) - if ( - self.grid_type.lower() != "gridded" - and self.grid_type.lower() != "unstructured" - and self.grid_type.lower() != "hydrofabric" - ): - err_out_screen( - 'GRID_TYPE in the configuration file only accepts "unstructured", "gridded", or "hydrofabric" as options.' - ) + @property + def num_supp_output_steps(self) -> int: + """Calculate the number of supplemental precip output time steps per forecast cycle based on the forecast cycle length and the custom supplemental precip output frequency specified by the user in the configuration file.""" + if self.precip_only_flag: + return int(self.cycle_length_minutes / self.customSuppPcpFreq) - if self.grid_type.lower() == "gridded": - # Process the geogrid variable information - try: - self.lon_var = cfg_bmi["LONVAR"] - except KeyError as e: - err_out_screen("Unable to locate LONVAR in the configuration file.", e) - except configparser.NoOptionError as e: - err_out_screen("Unable to locate LONVAR in the configuration file.", e) - try: - self.lat_var = cfg_bmi["LATVAR"] - except KeyError as e: - err_out_screen("Unable to locate LATVAR in the configuration file.", e) - except configparser.NoOptionError as e: - err_out_screen("Unable to locate LATVAR in the configuration file.", e) - - elif self.grid_type.lower() == "unstructured": - # Process the geogrid variable information - try: - self.nodecoords_var = cfg_bmi["NodeCoords"] - except KeyError as e: - err_out_screen( - "Unable to locate NodeCoords for unstructured mesh in the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate NodeCoords for unstructured mesh in the configuration file.", - e, - ) - try: - self.elemcoords_var = cfg_bmi["ElemCoords"] - except KeyError as e: - err_out_screen( - "Unable to locate ElemCoords for unstructured mesh in the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate ElemCoords for unstructured mesh in the configuration file.", - e, - ) - try: - self.elemconn_var = cfg_bmi["ElemConn"] - except KeyError as e: - err_out_screen( - "Unable to locate ElemConn for unstructured mesh in the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate ElemConn for unstructured mesh in the configuration file.", - e, - ) - try: - self.numelemconn_var = cfg_bmi["NumElemConn"] - except KeyError as e: - err_out_screen( - "Unable to locate NumElemConn for unstructured mesh in the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate NumElemConn for unstructured mesh in the configuration file.", - e, - ) + @property + def actual_output_steps(self) -> int: + """Calculate the actual number of output time steps per forecast cycle based on whether the user has chosen to run a reforecast simulation with a specified processing window, which will only output time steps for which input forcings are available based on the processing window and forecast time horizons specified by the user in the configuration file.""" + if self.ana_flag: + return np.int32(self.nFcsts) + else: + return np.int32(self.num_output_steps) - elif self.grid_type.lower() == "hydrofabric": - # Process the geogrid variable information - try: - self.nodecoords_var = cfg_bmi["NodeCoords"] - except KeyError as e: - err_out_screen( - "Unable to locate NodeCoords for unstructured mesh in the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate NodeCoords for unstructured mesh in the configuration file.", - e, - ) - try: - self.elemcoords_var = cfg_bmi["ElemCoords"] - except KeyError as e: - err_out_screen( - "Unable to locate ElemCoords for unstructured mesh in the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate ElemCoords for unstructured mesh in the configuration file.", - e, - ) - try: - self.element_id_var = cfg_bmi["ElemID"] - except KeyError as e: - err_out_screen( - "Unable to locate ElemID for unstructured mesh in the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate ElemID for unstructured mesh in the configuration file.", - e, - ) - try: - self.elemconn_var = cfg_bmi["ElemConn"] - except KeyError as e: - err_out_screen( - "Unable to locate ElemConn for unstructured mesh in the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate ElemConn for unstructured mesh in the configuration file.", - e, - ) - try: - self.numelemconn_var = cfg_bmi["NumElemConn"] - except KeyError as e: - err_out_screen( - "Unable to locate NumElemConn for unstructured mesh in the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate NumElemConn for unstructured mesh in the configuration file.", - e, - ) + @property + def grid_type(self) -> str: + """Tells the NextGen Forcings Engine BMI which grid type the engine is initalizing as a BMI instance. This is a required field and the proper string values should be "gridded", "hydrofabric", or "unstructured". + + Example- GRID_TYPE: "gridded" + """ + return self._grid_type - # Process geospatial information + @grid_type.setter + def grid_type(self, value: str) -> None: + """Set the grid type specified by the user in the configuration file. This is used to control how the program reads in and processes the geogrid information for regridding input forcings based on the grid type specified by the user in the configuration file.""" + self.check_input_values_in_range( + [value.lower()], "GRID_TYPE", ["gridded", "unstructured", "hydrofabric"] + ) + self._grid_type = value.lower() + + @property + def lon_var(self) -> str: + """Naming convention of the longitude variable within the "GeogridIn" file the user has specified. Variable naming convention ONLY for gridded domain configurations. This is required so the NextGen Forcings Engine BMI can dyanmically initialize the domain geogrid as an ESMF regridding object. In the case for "gridded" domain configuration options and a user specifying downscaling options while only specifying a height variable feature on the grid, this netcdf variable (LONVAR) is then EXPECTED to contain a netcdf metadata attribute called "dx" that specifies the grid spacing in the longtiudinal direction. Otherwise, it will throw an error and not be able to calculate the slope and tilt of each grid cell. - if self.geogrid: - LOG.debug(f"Geogrid: {self.geogrid}") + Example- LONVAR: "XLONG_M" + """ + if self.grid_type == "gridded": + return self.extract_input_variable("LONVAR") + + @property + def lat_var(self) -> str: + """Naming convention of the latitude variable within the "GeogridIn" file the user has specified. Variable naming convention ONLY for gridded domain configurations. This is required so the NextGen Forcings Engine BMI can dyanmically initialize the domain geogrid as an ESMF regridding object. In the case for "gridded" domain configuration options and a user specifying downscaling options while only specifying a height variable feature on the grid, this netcdf variable (LATVAR) is then EXPECTED to contain a netcdf metadata attribute called "dy" that specifies the grid spacing in the latitudinal direction. Otherwise, it will throw an error and not be able to calculate the slope and tilt of each grid cell. + + Example- LATVAR: "XLAT_M" + """ + if self.grid_type == "gridded": + return self.extract_input_variable("LATVAR") + + @property + def nodecoords_var(self) -> str: + """Naming convention of the node coordinates variable within the "GeogridIn" file the user has specified for ONLY an unstructured mesh or the NextGen hydrofabric. This is a 2-D array stating the latitude and longitude coordinates for all the nodes in the mesh. This is required so the NextGen Forcings Engine BMI can dyanmically initialize the domain geogrid as an ESMF regridding object. + + Example- NodeCoods: "nodecoords" + """ + if self.grid_type in ["unstructured", "hydrofabric"]: + return self.extract_input_variable("NodeCoords") + + @property + def elemcoords_var(self) -> str: + """Naming convention of the element coordinates variable within the "GeogridIn" file the user has specified for ONLY an unstructured mesh or the NextGen hydrofabric. This is a 2-D array stating the latitude and longitude coordinates for all the elements in the mesh. This is required so the NextGen Forcings Engine BMI can dyanmically initialize the domain geogrid as an ESMF regridding object. + + Example- ElemCoods: "elemcoords" + """ + if self.grid_type in ["unstructured", "hydrofabric"]: + return self.extract_input_variable("ElemCoords") + + @property + def elemconn_var(self) -> str: + """Naming convention of the element connectivity variable within the "GeogridIn" file the user has specified for ONLY an unstructured mesh or the NextGen hydrofabric. This is a 2-D array stating the node ids for each element connecting the entire mesh structure. This is required so the NextGen Forcings Engine BMI can dyanmically initialize the domain geogrid as an ESMF regridding object. + + Example- ElemConn: "elemconn" + """ + if self.grid_type in ["unstructured", "hydrofabric"]: + return self.extract_input_variable("ElemConn") + + @property + def numelemconn_var(self) -> str: + """Naming convention of the number of nodes per element variable within the "GeogridIn" file the user has specified for ONLY an unstructured mesh or the NextGen hydrofabric. This is a 1-D array stating the how many nodes are connecting each element within the unstructured mesh. This is required so the NextGen Forcings Engine BMI can dyanmically initialize the domain geogrid as an ESMF regridding object. + + Example- NumElemConn: "numelemconn" + """ + if self.grid_type in ["unstructured", "hydrofabric"]: + return self.extract_input_variable("NumElemConn") + + @property + def element_id_var(self) -> str: + """Naming convention of the element id variable within the "GeogridIn" file the user has specified for ONLY the NextGen hydrofabric. This is a 1-D array stating the catchment id numeric naming convention within the "divides" geopackage layer of a given NextGen hydrofabric file. This variable is required in order for the NextGen Forcings Engine to properly advertise the element ids of the unstructured mesh linked to the NextGen hydrofabric catchment ids. + + Example- ElemID: "element_ids" + """ + if self.grid_type == "hydrofabric": + return self.extract_input_variable("ElemID") + + @property + def ignored_border_widths(self) -> list: + """Border width (in grid cells) to ignore for each input dataset. NOTE: generally, the first input forcing should always be zero or there will be missing data in the final output. + + Example- IgnoredBorderWidths: [0,10] + """ + return self._ignored_border_widths + + @ignored_border_widths.setter + def ignored_border_widths(self, value: list) -> None: + """Set the list of ignored border widths specified by the user in the configuration file. This is used to control how the program processes input forcings based on the ignored border widths specified for each input forcing in the configuration file.""" + if not self.precip_only_flag: + self.check_number_of_inputs_forcings(value, "IgnoredBorderWidths") + self.check_input_values_non_negative(value, "IgnoredBorderWidths") + self._ignored_border_widths = value + + @property + def regrid_opt(self): + """Choose regridding options for each input forcing files being used. Options available are: 1 - ESMF Bilinear, 2 - ESMF Nearest Neighbor, 3 - ESMF Conservative Bilinear. + + Example- RegridOpt: [1,1] + """ + return self._regrid_opt + + @regrid_opt.setter + def regrid_opt(self, value: list) -> None: + """Set the list of regridding options specified by the user in the configuration file. This is used to control how input forcings are regridded based on the regridding option specified for each input forcing in the configuration file.""" + if not self.precip_only_flag: + self.check_number_of_inputs_forcings(value, "RegridOpt") + self.check_input_values_in_range(value, "RegridOpt", [1, 2, 3]) + self._regrid_opt = value else: - try: - self.geogrid = cfg_bmi["GeogridIn"] - except KeyError as e: - err_out_screen( - "Unable to locate GeogridIn in the configuration file.", e - ) - except configparser.NoOptionError as e: + self._regrid_opt = None + + @property + def weightsDir(self) -> str: + """Get the pathway to the ESMF weights directory specified by the user in the configuration file. This is used to control where the program looks for ESMF weights files if the user has chosen to use pre-generated ESMF weights files for regridding input forcings in the configuration file.""" + return self._weightsDir + + @weightsDir.setter + def weightsDir(self, value: str) -> None: + """Set the pathway to the ESMF weights directory specified by the user in the configuration file. This is used to control where the program looks for ESMF weights files if the user has chosen to use pre-generated ESMF weights files for regridding input forcings in the configuration file.""" + if not self.precip_only_flag: + if value is not None and not os.path.exists(value): err_out_screen( - "Unable to locate GeogridIn in the configuration file.", e + f"ESMF Weights file directory specified ({value}) but does not exist" ) + self._weightsDir = value - # Check for the optional geospatial land metadata file. - try: - self.spatial_meta = cfg_bmi["SpatialMetaIn"] - except KeyError as e: - err_out_screen( - "Unable to locate SpatialMetaIn in the configuration file.", e + @property + def forceTemoralInterp(self) -> list: + """Get the list of forcing temporal interpolation options specified by the user in the configuration file. This is used to control how input forcings are temporally interpolated based on the temporal interpolation option specified for each input forcing in the configuration file.""" + return self._forceTemoralInterp + + @forceTemoralInterp.setter + def forceTemoralInterp(self, value: list) -> None: + """Specify an temporal interpolation for the forcing variables. Interpolation will be done between the two neighboring input forcing states that exist. If only one nearest state exist (I.E. only a state forward in time, or behind), then that state will be used as a "nearest neighbor". NOTE - All input options here must be of the same length of the input forcing number. Also note all temporal interpolation occurs BEFORE downscaling and bias correction. 0 - No temporal interpolation. 1 - Nearest Neighbor, 2 - Linear weighted, average. + + Example- ForcingTemporalInterpolation: [0,0] + """ + if not self.precip_only_flag: + self.check_number_of_inputs_forcings(value, "ForcingTemporalInterpolation") + self.check_input_values_in_range( + value, "ForcingTemporalInterpolation", [0, 1, 2] ) - if len(self.spatial_meta) == 0: - # No spatial metadata file found. - self.spatial_meta = None - else: - if not os.path.isfile(self.spatial_meta): - err_out_screen( - "Unable to locate optional spatial metadata file: " - + self.spatial_meta - ) + self._forceTemoralInterp = value + + @property + def t2dDownscaleOpt(self) -> list: + """Specify a temperature downscaling method: 0 - No downscaling, 1 - Use a simple lapse rate of 6.75 degrees Celsius to get from the model elevation to the WRF-Hydro elevation, 2 - Use a pre-calculated lapse rate regridded to the WRF-Hydro domain (only NWM), 3 - Use a dynamic lapse rate calculated at each timstep. + + Example- TemperatureDownscaling: [3, 3] + """ + return self._t2dDownscaleOpt + @t2dDownscaleOpt.setter + def t2dDownscaleOpt(self, value: list) -> None: + """Set the list of temperature downscaling options specified by the user in the configuration file. This is used to control how temperature input forcings are downscaled based on the temperature downscaling option specified for each input forcing in the configuration file.""" if not self.precip_only_flag: - # Check for the IgnoredBorderWidths - try: - self.ignored_border_widths = cfg_bmi["IgnoredBorderWidths"] - except (KeyError, configparser.NoOptionError): - # if didn't specify, no worries, just set to 0 - self.ignored_border_widths = [0.0] * self.number_inputs - except json.decoder.JSONDecodeError as e: - err_out_screen( - "Improper IgnoredBorderWidths option specified in the configuration file." - "({} was supplied".format( - cfg_bmi["Geospatial"]["IgnoredBorderWidths"] - ), - e, - ) - if len(self.ignored_border_widths) != self.number_inputs: - err_out_screen( - "Please specify IgnoredBorderWidths values for each " - "corresponding input forcings for SuppForcing." - "({} was supplied".format(self.ignored_border_widths) - ) - if any(map(lambda x: x < 0, self.ignored_border_widths)): - err_out_screen( - "Please specify IgnoredBorderWidths values greater than or equal to zero:" - "({} was supplied".format(self.ignored_border_widths) - ) + self.check_number_of_inputs_forcings(value, "TemperatureDownscaling") + self.check_input_values_in_range(value, "TemperatureDownscaling", [0, 1, 2]) + count = 0 + for opt in value: + if opt == 2: + self.param_flag[count] = 1 + count += 1 + self._t2dDownscaleOpt = value + @property + def psfcDownscaleOpt(self) -> list: + """Specify a surface pressure downscaling method: 0 - No downscaling, 1 - Use input elevation and WRF-Hydro elevation to downscale surface pressure. + + Example- PressureDownscaling: [1, 1] + """ + return self._psfcDownscaleOpt + + @psfcDownscaleOpt.setter + def psfcDownscaleOpt(self, value: list) -> None: + """Set the list of pressure downscaling options specified by the user in the configuration file. This is used to control how pressure input forcings are downscaled based on the pressure downscaling option specified for each input forcing in the configuration file.""" if not self.precip_only_flag: - # Process regridding options. - try: - self.regrid_opt = cfg_bmi["RegridOpt"] - except KeyError as e: - err_out_screen( - "Unable to locate RegridOpt under the Regridding section in the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate RegridOpt under the Regridding section in the configuration file.", - e, - ) - except json.decoder.JSONDecodeError as e: - err_out_screen( - "Improper RegridOpt options specified in the configuration file.", e - ) - if len(self.regrid_opt) != self.number_inputs: - err_out_screen( - "Please specify RegridOpt values for each corresponding input forcings in the configuration file.", - e, - ) - # Check to make sure regridding options makes sense. - for regridOpt in self.regrid_opt: - if regridOpt < 1 or regridOpt > 3: - err_out_screen( - "Invalid RegridOpt chosen in the configuration file. Please choose a " - "value of 1-2 for each corresponding input forcing." - ) - try: - # Read weight file directory (optional) - self.weightsDir = cfg_bmi["RegridWeightsDir"] - except Exception: - # Set wieghtsDir to None; this will create regrid object in memory - self.weightsDir = None - if self.weightsDir: - # if we do have one specified, make sure it exists - if not os.path.exists(self.weightsDir): - err_out_screen( - "ESMF Weights file directory specified ({}) but does not exist" - ).format(self.weightsDir) + self.check_number_of_inputs_forcings(value, "PressureDownscaling") + self.check_input_values_in_range(value, "PressureDownscaling", [0, 1]) + self._psfcDownscaleOpt = value - # Calculate the beginning/ending processing dates if we are running realtime - if self.realtime_flag: - calculate_lookback_window(self) + @property + def swDownscaleOpt(self) -> list: + """Specify a shortwave radiation downscaling routine. 0 - No downscaling, 1 - Run a topographic adjustment using the WRF-Hydro elevation. - # Create temporary array to hold flags if we need input parameter files. - param_flag = np.empty([len(self.input_forcings)], int) - param_flag[:] = 0 + Example- ShortwaveDownscaling: [1, 1] + """ + return self._swDownscaleOpt + + @swDownscaleOpt.setter + def swDownscaleOpt(self, value: list) -> None: + """Set the list of shortwave downscaling options specified by the user in the configuration file. This is used to control how shortwave radiation input forcings are downscaled based on the shortwave downscaling option specified for each input forcing in the configuration file.""" if not self.precip_only_flag: - # Read in temporal interpolation options. - try: - self.forceTemoralInterp = cfg_bmi["ForcingTemporalInterpolation"] - except KeyError as e: - err_out_screen( - "Unable to locate ForcingTemporalInterpolation under the Interpolation section in the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate ForcingTemporalInterpolation under the Interpolation section in the configuration file.", - e, - ) - except json.decoder.JSONDecodeError as e: - err_out_screen( - "Improper ForcingTemporalInterpolation options specified in the configuration file.", - e, - ) - if len(self.forceTemoralInterp) != self.number_inputs: - err_out_screen( - "Please specify ForcingTemporalInterpolation values for each corresponding input forcings in the configuration file." - ) - # Ensure the forcingTemporalInterpolation values make sense. - for temporalInterpOpt in self.forceTemoralInterp: - if temporalInterpOpt < 0 or temporalInterpOpt > 2: - err_out_screen( - "Invalid ForcingTemporalInterpolation chosen in the configuration file. " - "Please choose a value of 0-2 for each corresponding input forcing." - ) + self.check_number_of_inputs_forcings(value, "ShortwaveDownscaling") + self.check_input_values_in_range(value, "ShortwaveDownscaling", [0, 1]) + self._swDownscaleOpt = value - # Read in the temperature downscaling options. - try: - self.t2dDownscaleOpt = cfg_bmi["TemperatureDownscaling"] - except KeyError as e: - err_out_screen( - "Unable to locate TemperatureDownscaling under the Downscaling section of the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate TemperatureDownscaling under the Downscaling section of the configuration file.", - e, - ) - except json.decoder.JSONDecodeError as e: - err_out_screen( - "Improper TemperatureDownscaling options specified in the configuration file.", - e, - ) - if len(self.t2dDownscaleOpt) != self.number_inputs: - err_out_screen( - "Please specify TemperatureDownscaling values for each corresponding input forcings in the configuration file." - ) - # Ensure the downscaling options chosen make sense. - count_tmp = 0 - for optTmp in self.t2dDownscaleOpt: - if optTmp < 0 or optTmp > 2: - err_out_screen( - "Invalid TemperatureDownscaling options specified in the configuration file." - ) - if optTmp == 2: - param_flag[count_tmp] = 1 - count_tmp = count_tmp + 1 + @property + def q2dDownscaleOpt(self) -> list: + """Specify a specific humidity downscaling routine. 0 - No downscaling, 1 - Use regridded humidity, along with downscaled temperature/pressure to extrapolate a downscaled surface specific humidty. - # Read in the pressure downscaling options. - try: - self.psfcDownscaleOpt = cfg_bmi["PressureDownscaling"] - except KeyError as e: - err_out_screen( - "Unable to locate PressureDownscaling under the Downscaling section of the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate PressureDownscaling under the Downscaling section of the configuration file.", - e, - ) - except json.decoder.JSONDecodeError as e: - err_out_screen( - "Improper PressureDownscaling options specified in the configuration file." - ) - if len(self.psfcDownscaleOpt) != self.number_inputs: - err_out_screen( - "Please specify PressureDownscaling values for each corresponding input forcings in the configuration file." - ) - # Ensure the downscaling options chosen make sense. - for optTmp in self.psfcDownscaleOpt: - if optTmp < 0 or optTmp > 1: - err_out_screen( - "Invalid PressureDownscaling options specified in the configuration file." - ) + Example- HumidityDownscaling: [1, 1] + """ + return self._q2dDownscaleOpt - # Read in the shortwave downscaling options - try: - self.swDownscaleOpt = cfg_bmi["ShortwaveDownscaling"] - except KeyError as e: - err_out_screen( - "Unable to locate ShortwaveDownscaling under the Downscaling section of the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate ShortwaveDownscaling under the Downscaling section of the configuration file.", - e, - ) - except json.decoder.JSONDecodeError as e: - err_out_screen( - "Improper ShortwaveDownscaling options specified in the configuration file.", - e, - ) - if len(self.swDownscaleOpt) != self.number_inputs: - err_out_screen( - "Please specify ShortwaveDownscaling values for each corresponding input forcings in the configuration file." - ) - # Ensure the downscaling options chosen make sense. - for optTmp in self.swDownscaleOpt: - if optTmp < 0 or optTmp > 1: - err_out_screen( - "Invalid ShortwaveDownscaling options specified in the configuration file." - ) + @q2dDownscaleOpt.setter + def q2dDownscaleOpt(self, value: list) -> None: + """Set the list of humidity downscaling options specified by the user in the configuration file. This is used to control how humidity input forcings are downscaled based on the humidity downscaling option specified for each input forcing in the configuration file.""" + if not self.precip_only_flag: + self.check_number_of_inputs_forcings(value, "HumidityDownscaling") + self.check_input_values_in_range(value, "HumidityDownscaling", [0, 1]) + self._q2dDownscaleOpt = value - # Read in humidity downscaling options. - try: - self.q2dDownscaleOpt = cfg_bmi["HumidityDownscaling"] - except KeyError as e: - err_out_screen( - "Unable to locate HumidityDownscaling under the Downscaling section of the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate HumidityDownscaling under the Downscaling section of the configuration file.", - e, - ) - except json.decoder.JSONDecodeError as e: - err_out_screen( - "Improper HumidityDownscaling options specified in the configuration file.", - e, - ) - if len(self.q2dDownscaleOpt) != self.number_inputs: - err_out_screen( - "Please specify HumidityDownscaling values for each corresponding " - "input forcings in the configuration file." - ) - # Ensure the downscaling options chosen make sense. - for optTmp in self.q2dDownscaleOpt: - if optTmp < 0 or optTmp > 1: - err_out_screen( - "Invalid HumidityDownscaling options specified in the configuration file." - ) + @property + def precipDownscaleOpt(self) -> list: + """Specify a precipitation downscaling routine. 0 - No downscaling, 1 - NWM mountain mapper downscaling using monthly PRISM climo. - # Read in the precipitation downscaling options - try: - self.precipDownscaleOpt = cfg_bmi["PrecipDownscaling"] - except KeyError as e: - err_out_screen( - "Unable to locate PrecipDownscaling under the Downscaling section of the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate PrecipDownscaling under the Downscaling section of the configuration file.", - e, - ) - except json.decoder.JSONDecodeError as e: - err_out_screen( - "Improper PrecipDownscaling options specified in the configuration file.", - e, - ) + Example- PrecipDownscaling: [0, 0] + """ + return self._precipDownscaleOpt + + @precipDownscaleOpt.setter + def precipDownscaleOpt(self, value: list) -> None: + """Set the list of precipitation downscaling options specified by the user in the configuration file. This is used to control how precipitation input forcings are downscaled based on the precipitation downscaling option specified for each input forcing in the configuration file.""" if not self.precip_only_flag: - if len(self.precipDownscaleOpt) != self.number_inputs: - err_out_screen( - "Please specify PrecipDownscaling values for each corresponding " - "input forcings in the configuration file." - ) - # Ensure the downscaling options chosen make sense. - count_tmp = 0 - for optTmp in self.precipDownscaleOpt: - if optTmp < 0 or optTmp > 1: - err_out_screen( - "Invalid PrecipDownscaling options specified in the configuration file." - ) - if optTmp == 1: - param_flag[count_tmp] = 1 - count_tmp = count_tmp + 1 + self.check_number_of_inputs_forcings(value, "PrecipDownscaling") + self.check_input_values_in_range(value, "PrecipDownscaling", [0, 1]) + count = 0 + for opt in value: + if opt == 1: + self.param_flag[count] = 1 + count += 1 + self._precipDownscaleOpt = value - # Read in the downscaling parameter directory. - try: - self.dScaleParamDirs = cfg_bmi["DownscalingParamDirs"] - except KeyError as e: - err_out_screen( - "Unable to locate DownscalingParamDirs in the configuration file.", e - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate DownscalingParamDirs in the configuration file.", e - ) - if len(self.dScaleParamDirs) != len(self.input_forcings): - err_out_screen( - "Please specify a downscaling parameter directory for each " - "corresponding downscaling option that requires one." - ) - # Loop through each downscaling parameter directory and make sure they exist. - for dirTmp in range(0, len(self.dScaleParamDirs)): - if not os.path.isdir(self.dScaleParamDirs[dirTmp]): + @property + def dScaleParamDirs(self) -> list: + """Specify the input parameter directory containing necessary downscaling grids. This is ONLY needed for the original NWM WRF-Hydro domain. Otherwise, just point it to a random directory and it will be ignored. + + Example- DownscalingParamDirs: ["./forcingParam/AnA", "./forcingParam/AnA"] + """ + return self._dScaleParamDirs + + @dScaleParamDirs.setter + def dScaleParamDirs(self, value: list) -> None: + """Set the list of downscaling parameter directories specified by the user in the configuration file. This is used to control where the program looks for downscaling parameter files for each input forcing based on the downscaling parameter directory specified for each input forcing in the configuration file.""" + self.check_number_of_inputs_forcings(value, "DownscalingParamDirs") + for dirTmp in range(0, len(value)): + dir_path = value[dirTmp] + if not os.path.isdir(dir_path): err_out_screen( - "Unable to locate parameter directory: " - + os.path.abspath(self.dScaleParamDirs[dirTmp]) + f"Unable to locate parameter directory: {os.path.abspath(dir_path)}" ) + self._dScaleParamDirs = value + @property + def perform_downscaling(self) -> bool: + """Determine whether downscaling of input forcings is necessary based on the downscaling options specified by the user for each input forcing in the configuration file.""" if ( - [1] in self.q2dDownscaleOpt - or [1] in self.swDownscaleOpt - or [1] in self.psfcDownscaleOpt - or [1, 2] in self.t2dDownscaleOpt + 1 in self.q2dDownscaleOpt + or 1 in self.swDownscaleOpt + or 1 in self.psfcDownscaleOpt + or 1 in self.t2dDownscaleOpt + or 2 in self.t2dDownscaleOpt ): - # Process the geogrid information for downscaling - try: - self.sinalpha_var = cfg_bmi["SINALPHA"] - except Exception: - self.sinalpha_var = None - try: - self.cosalpha_var = cfg_bmi["COSALPHA"] - except Exception: - self.cosalpha_var = None - if self.grid_type.lower() == "hydrofabric": - try: - self.slope_var = cfg_bmi["SLOPE"] - except KeyError as e: - err_out_screen( - "Unable to locate SLOPE variable in the hydrofabric configuration file. Required variable since user turned on a downscaling option.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate SLOPE variable in the hydrofabric configuration file. Required variable since user turned on a downscaling option.", - e, - ) - try: - self.slope_azimuth_var = cfg_bmi["SLOPE_AZIMUTH"] - except KeyError as e: - err_out_screen( - "Unable to locate SLOPE_AZIMUTH variable in the hydrofabric configuration file. Required variable since user turned on a downscaling option.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate SLOPE_AZIMUTH variable in the hydrofabric configuration file. Required variable since user turned on a downscaling option.", - e, - ) - else: - try: - self.slope_var = cfg_bmi["SLOPE"] - except Exception: - self.slope_var = None - try: - self.slope_azimuth_var = cfg_bmi["SLOPE_AZIMUTH"] - except Exception: - self.slope_azimuth_var = None - if self.grid_type.lower() == "unstructured": - try: - self.slope_var_elem = cfg_bmi["SLOPE_ELEM"] - except Exception: - self.slope_var_elem = None - try: - self.slope_azimuth_var_elem = cfg_bmi["SLOPE_AZIMUTH_ELEM"] - except Exception: - self.slope_azimuth_var_elem = None - - if self.grid_type.lower() == "unstructured": - try: - self.hgt_elem_var = cfg_bmi["HGTVAR_ELEM"] - except KeyError as e: - err_out_screen( - "Unable to locate HGTVAR_ELEM in the configuration file. Required variable since user turned on a downscaling option.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate HGTVAR_ELEM in the configuration file. Required variable since user turned on a downscaling option.", - e, - ) + return True - try: - self.hgt_var = cfg_bmi["HGTVAR"] - except KeyError as e: - err_out_screen( - "Unable to locate HGTVAR in the configuration file. Required variable since user turned on a downscaling option.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate HGTVAR in the configuration file. Required variable since user turned on a downscaling option.", - e, - ) + @property + def t2BiasCorrectOpt(self) -> list: + """Specify a temperature bias correction method. 0 - No bias correction, 1 - CFSv2 - NLDAS2 Parametric Distribution - NWM ONLY, 2 - Custom NCAR bias-correction based on HRRRv3 analysis - based on hour of day (USE WITH CAUTION), 3 - NCAR parametric GFS bias correction, 4 - NCAR parametric HRRR bias correction. - # * Bias Correction Options * + Example- TemperatureBiasCorrection: [0, 4] + """ + return self._t2BiasCorrectOpt + + @t2BiasCorrectOpt.setter + def t2BiasCorrectOpt(self, value: list) -> None: + """Set the list of temperature bias correction options specified by the user in the configuration file. This is used to control how temperature input forcings are bias corrected based on the temperature bias correction option specified for each input forcing in the configuration file.""" if not self.precip_only_flag: - # Read in temperature bias correction options - try: - self.t2BiasCorrectOpt = cfg_bmi["TemperatureBiasCorrection"] - except KeyError as e: - err_out_screen( - "Unable to locate TemperatureBiasCorrection under the BiasCorrection section of the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate TemperatureBiasCorrection under the BiasCorrection section of the configuration file.", - e, - ) - except json.JSONDecodeError as e: - err_out_screen( - "Improper TemperatureBiasCorrection options specified in the configuration file.", - e, - ) - if len(self.t2BiasCorrectOpt) != self.number_inputs: - err_out_screen( - "Please specify TemperatureBiasCorrection values for each corresponding input forcings in the configuration file." - ) - # Ensure the bias correction options chosen make sense. - for optTmp in self.t2BiasCorrectOpt: - if optTmp < 0 or optTmp > 4: - err_out_screen( - "Invalid TemperatureBiasCorrection options specified in the configuration file." - ) + self.check_number_of_inputs_forcings(value, "TemperatureBiasCorrection") + self.check_input_values_in_range( + value, "TemperatureBiasCorrection", [0, 1, 2, 3, 4] + ) + self._t2BiasCorrectOpt = value - # Read in surface pressure bias correction options. - try: - self.psfcBiasCorrectOpt = cfg_bmi["PressureBiasCorrection"] - except KeyError as e: - err_out_screen( - "Unable to locate PressureBiasCorrection under the BiasCorrection section of the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate PressureBiasCorrection under the BiasCorrection section of the configuration file.", - e, - ) - except json.JSONDecodeError as e: - err_out_screen( - "Improper PressureBiasCorrection options specified in the configuration file.", - e, - ) - if len(self.psfcDownscaleOpt) != self.number_inputs: - err_out_screen( - "Please specify PressureBiasCorrection values for each corresponding input forcings in the configuration file." - ) - # Ensure the bias correction options chosen make sense. - for optTmp in self.psfcBiasCorrectOpt: - if optTmp < 0 or optTmp > 1: - err_out_screen( - "Invalid PressureBiasCorrection options specified in the configuration file." - ) - if optTmp == 1: - # We are running NWM-Specific bias-correction of CFSv2 that needs to take place prior to regridding. - self.runCfsNldasBiasCorrect = True + @property + def psfcBiasCorrectOpt(self) -> list: + """Specify a surface pressure bias correction method. 0 - No bias correction, 1 - CFSv2 - NLDAS2 Parametric Distribution - NWM ONLY. - # Read in humidity bias correction options. - try: - self.q2BiasCorrectOpt = cfg_bmi["HumidityBiasCorrection"] - except KeyError as e: - err_out_screen( - "Unable to locate HumidityBiasCorrection under the BiasCorrection section of the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate HumidityBiasCorrection under the BiasCorrection section of the configuration file.", - e, - ) - except json.JSONDecodeError as e: - err_out_screen( - "Improper HumdityBiasCorrection options specified in the configuration file.", - e, - ) - if len(self.q2BiasCorrectOpt) != self.number_inputs: - err_out_screen( - "Please specify HumidityBiasCorrection values for each corresponding input forcings in the configuration file." - ) - # Ensure the bias correction options chosen make sense. - for optTmp in self.q2BiasCorrectOpt: - if optTmp < 0 or optTmp > 2: - err_out_screen( - "Invalid HumidityBiasCorrection options specified in the configuration file." - ) - if optTmp == 1: - # We are running NWM-Specific bias-correction of CFSv2 that needs to take place prior to regridding. - self.runCfsNldasBiasCorrect = True + Example- PressureBiasCorrection: [0,0] + """ + return self._psfcBiasCorrectOpt - # Read in wind bias correction options. - try: - self.windBiasCorrect = cfg_bmi["WindBiasCorrection"] - except KeyError as e: - err_out_screen( - "Unable to locate WindBiasCorrection under the BiasCorrection section of the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate WindBiasCorrection under the BiasCorrection section of the configuration file.", - e, - ) - except json.JSONDecodeError as e: - err_out_screen( - "Improper WindBiasCorrection options specified in the configuration file.", - e, - ) - if len(self.windBiasCorrect) != self.number_inputs: - err_out_screen( - "Please specify WindBiasCorrection values for each corresponding input forcings in the configuration file." - ) - # Ensure the bias correction options chosen make sense. - for optTmp in self.windBiasCorrect: - if optTmp < 0 or optTmp > 4: - err_out_screen( - "Invalid WindBiasCorrection options specified in the configuration file." - ) - if optTmp == 1: - # We are running NWM-Specific bias-correction of CFSv2 that needs to take place prior to regridding. - self.runCfsNldasBiasCorrect = True + @psfcBiasCorrectOpt.setter + def psfcBiasCorrectOpt(self, value: list) -> None: + """Set the list of pressure bias correction options specified by the user in the configuration file. This is used to control how pressure input forcings are bias corrected based on the pressure bias correction option specified for each input forcing in the configuration file.""" + if not self.precip_only_flag: + self.check_number_of_inputs_forcings(value, "PressureBiasCorrection") + self.check_input_values_in_range(value, "PressureBiasCorrection", [0, 1]) + self._psfcBiasCorrectOpt = value - # Read in shortwave radiation bias correction options. - try: - self.swBiasCorrectOpt = cfg_bmi["SwBiasCorrection"] - except KeyError as e: - err_out_screen( - "Unable to locate SwBiasCorrection under the BiasCorrection section of the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate SwBiasCorrection under the BiasCorrection section of the configuration file.", - e, - ) - except json.JSONDecodeError as e: - err_out_screen( - "Improper SwBiasCorrection options specified in the configuration file.", - e, - ) - if len(self.swBiasCorrectOpt) != self.number_inputs: - err_out_screen( - "Please specify SwBiasCorrection values for each corresponding input forcings in the configuration file." - ) - # Ensure the bias correction options chosen make sense. - for optTmp in self.swBiasCorrectOpt: - if optTmp < 0 or optTmp > 2: - err_out_screen( - "Invalid SwBiasCorrection options specified in the configuration file." - ) - if optTmp == 1: - # We are running NWM-Specific bias-correction of CFSv2 that needs to take place prior to regridding. - self.runCfsNldasBiasCorrect = True + @property + def q2BiasCorrectOpt(self): + """Specify a specific humidity bias correction method. 0 - No bias correction, 1 - CFSv2 - NLDAS2 Parametric Distribution - NWM ONLY, 2 - Custom NCAR bias-correction based on HRRRv3 analysis - based on hour of day (USE WITH CAUTION). - # Read in longwave radiation bias correction options. - try: - self.lwBiasCorrectOpt = cfg_bmi["LwBiasCorrection"] - except KeyError as e: - err_out_screen( - "Unable to locate LwBiasCorrection under the BiasCorrection section of the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate LwBiasCorrection under the BiasCorrection section of the configuration file.", - e, - ) - except json.JSONDecodeError as e: - err_out_screen( - "Improper LwBiasCorrection options specified in the configuration file.", - e, - ) - if len(self.lwBiasCorrectOpt) != self.number_inputs: - err_out_screen( - "Please specify LwBiasCorrection values for each corresponding input forcings in the configuration file." - ) - # Ensure the bias correction options chosen make sense. - for optTmp in self.lwBiasCorrectOpt: - if optTmp < 0 or optTmp > 4: - err_out_screen( - "Invalid LwBiasCorrection options specified in the configuration file." - ) - if optTmp == 1: - # We are running NWM-Specific bias-correction of CFSv2 that needs to take place prior to regridding. - self.runCfsNldasBiasCorrect = True + Example- HumidityBiasCorrection: [0,0] + """ + return self._q2BiasCorrectOpt + + @q2BiasCorrectOpt.setter + def q2BiasCorrectOpt(self, value): + """Set the list of humidity bias correction options specified by the user in the configuration file. This is used to control how humidity input forcings are bias corrected based on the humidity bias correction option specified for each input forcing in the configuration file.""" + if not self.precip_only_flag: + self.check_number_of_inputs_forcings(value, "HumidityBiasCorrection") + self.check_input_values_in_range(value, "HumidityBiasCorrection", [0, 1, 2]) + self._q2BiasCorrectOpt = value + + @property + def windBiasCorrect(self): + """Specify a wind bias correction. 0 - No bias correction, 1 - CFSv2 - NLDAS2 Parametric Distribution - NWM ONLY, 2 - Custom NCAR bias-correction based on HRRRv3 analysis - based on hour of day (USE WITH CAUTION), 3 - NCAR parametric GFS bias correction, 4 - NCAR parametric HRRR bias correction. + + Example- WindBiasCorrection: [0, 4] + """ + return self._windBiasCorrect + + @windBiasCorrect.setter + def windBiasCorrect(self, value): + """Set the list of wind bias correction options specified by the user in the configuration file. This is used to control how wind input forcings are bias corrected based on the wind bias correction option specified for each input forcing in the configuration file.""" + if not self.precip_only_flag: + self.check_number_of_inputs_forcings(value, "WindBiasCorrection") + self.check_input_values_in_range( + value, "WindBiasCorrection", [0, 1, 2, 3, 4] + ) + self._windBiasCorrect = value + + @property + def swBiasCorrectOpt(self) -> list: + """Specify a bias correction for incoming short wave radiation flux. 0 - No bias correction, 1 - CFSv2 - NLDAS2 Parametric Distribution - NWM ONLY, 2 - Custom NCAR bias-correction based on HRRRv3 analysis (USE WITH CAUTION). + + Example- SwBiasCorrection: [0, 2] + """ + return self._swBiasCorrectOpt + + @swBiasCorrectOpt.setter + def swBiasCorrectOpt(self, value: list) -> None: + """Set the list of shortwave radiation bias correction options specified by the user in the configuration file. This is used to control how shortwave radiation input forcings are bias corrected based on the shortwave radiation bias correction option specified for each input forcing in the configuration file.""" + if not self.precip_only_flag: + self.check_number_of_inputs_forcings(value, "SwBiasCorrection") + self.check_input_values_in_range(value, "SwBiasCorrection", [0, 1, 2]) + self._swBiasCorrectOpt = value + + @property + def lwBiasCorrectOpt(self) -> list: + """Specify a bias correction for incoming long wave radiation flux. 0 - No bias correction, 1 - CFSv2 - NLDAS2 Parametric Distribution - NWM ONLY, 2 - Custom NCAR bias-correction based on HRRRv3 analysis, blanket adjustment (USE WITH CAUTION), 3 - NCAR parametric GFS bias correction. - # Read in precipitation bias correction options. - try: - self.precipBiasCorrectOpt = cfg_bmi["PrecipBiasCorrection"] - except KeyError as e: - err_out_screen( - "Unable to locate PrecipBiasCorrection under the BiasCorrection section of the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate PrecipBiasCorrection under the BiasCorrection section of the configuration file.", - e, - ) - except json.JSONDecodeError as e: - err_out_screen( - "Improper PrecipBiasCorrection options specified in the configuration file.", - e, - ) - if not self.precip_only_flag: - if len(self.precipBiasCorrectOpt) != self.number_inputs: - err_out_screen( - "Please specify PrecipBiasCorrection values for each corresponding input forcings in the configuration file." - ) - # Ensure the bias correction options chosen make sense. - for optTmp in self.precipBiasCorrectOpt: - if optTmp < 0 or optTmp > 1: - err_out_screen( - "Invalid PrecipBiasCorrection options specified in the configuration file." - ) - if optTmp == 1: - # We are running NWM-Specific bias-correction of CFSv2 that needs to take place prior to regridding. - self.runCfsNldasBiasCorrect = True - - # Putting a constraint here that CFSv2-NLDAS bias correction (NWM only) is chosen, it must be turned on - # for ALL variables. - if self.runCfsNldasBiasCorrect: - if ( - min(self.precipBiasCorrectOpt) != 1 - and max(self.precipBiasCorrectOpt) != 1 - ): - err_out_screen( - "CFSv2-NLDAS NWM bias correction must be activated for Precipitation under this configuration." - ) - if min(self.lwBiasCorrectOpt) != 1 and max(self.lwBiasCorrectOpt) != 1: - err_out_screen( - "CFSv2-NLDAS NWM bias correction must be activated for long-wave radiation under this configuration." - ) - if min(self.swBiasCorrectOpt) != 1 and max(self.swBiasCorrectOpt) != 1: - err_out_screen( - "CFSv2-NLDAS NWM bias correction must be activated for short-wave radiation under this configuration." - ) - if min(self.t2BiasCorrectOpt) != 1 and max(self.t2BiasCorrectOpt) != 1: - err_out_screen( - "CFSv2-NLDAS NWM bias correction must be activated for surface temperature under this configuration." - ) - if min(self.windBiasCorrect) != 1 and max(self.windBiasCorrect) != 1: - err_out_screen( - "CFSv2-NLDAS NWM bias correction must be activated for wind forcings under this configuration." - ) - if min(self.q2BiasCorrectOpt) != 1 and max(self.q2BiasCorrectOpt) != 1: - err_out_screen( - "CFSv2-NLDAS NWM bias correction must be activated for specific humidity under this configuration." - ) - if ( - min(self.psfcBiasCorrectOpt) != 1 - and max(self.psfcBiasCorrectOpt) != 1 - ): + Example- LwBiasCorrection: [0, 2] + """ + return self._lwBiasCorrectOpt + + @lwBiasCorrectOpt.setter + def lwBiasCorrectOpt(self, value: list) -> None: + """Set the list of longwave radiation bias correction options specified by the user in the configuration file. This is used to control how longwave radiation input forcings are bias corrected based on the longwave radiation bias correction option specified for each input forcing in the configuration file.""" + if not self.precip_only_flag: + self.check_number_of_inputs_forcings(value, "LwBiasCorrection") + self.check_input_values_in_range(value, "LwBiasCorrection", [0, 1, 2, 3, 4]) + self._lwBiasCorrectOpt = value + + @property + def precipBiasCorrectOpt(self): + """Specify a bias correction for precipitation. 0 - No bias correction, 1 - CFSv2 - NLDAS2 Parametric Distribution - NWM ONLY. + + Example- PrecipBiasCorrection: [0, 0] + """ + return self._precipBiasCorrectOpt + + @precipBiasCorrectOpt.setter + def precipBiasCorrectOpt(self, value): + """Set the list of precipitation bias correction options specified by the user in the configuration file. This is used to control how precipitation input forcings are bias corrected based on the precipitation bias correction option specified for each input forcing in the configuration file.""" + if not self.precip_only_flag: + self.check_number_of_inputs_forcings(value, "PrecipBiasCorrection") + self.check_input_values_in_range(value, "PrecipBiasCorrection", [0, 1]) + self._precipBiasCorrectOpt = value + + @property + def bias_correction_properties(self) -> dict: + """Get the dictionary of bias correction properties specified by the user in the configuration file. This is used to control how input forcings are bias corrected based on the bias correction options specified for each input forcing in the configuration file.""" + return { + # "surface temperature": self.t2BiasCorrectOpt, #NOTE surface temperature was excluded from this consideration in the orignal code (5/7/2026 pre-refactor). Should it actually be included? + "surface pressure": self.psfcBiasCorrectOpt, + "specific humidity": self.q2BiasCorrectOpt, + "wind forcings": self.windBiasCorrect, + "short-wave radiation": self.swBiasCorrectOpt, + "long-wave radiation": self.lwBiasCorrectOpt, + "Precipitation": self.precipBiasCorrectOpt, + } + + @property + def runCfsNldasBiasCorrect(self) -> bool: + """Get the flag for whether to run the NWM-specific bias correction of CFSv2 input forcings specified by the user in the configuration file. This is used to control whether the NWM-specific bias correction of CFSv2 input forcings is run based on whether the user has chosen to run this bias correction in the configuration file.""" + run_cfs_nldas_bias_correct = False + for bias_option in self.bias_correction_properties.values(): + for opt in bias_option: + if opt == 1: + run_cfs_nldas_bias_correct = True + break + if run_cfs_nldas_bias_correct: + for ( + bias_correct_name, + bias_correct, + ) in self.bias_correction_properties.items(): + if min(bias_correct) != 1 and max(bias_correct) != 1: err_out_screen( - "CFSv2-NLDAS NWM bias correction must be activated for surface pressure under this configuration." + f"CFSv2-NLDAS NWM bias correction must be activated for {bias_correct_name} under this configuration." ) # Make sure we don't have any other forcings activated. This can only be ran for CFSv2. for opt_tmp in self.input_forcings: @@ -1697,424 +1280,260 @@ def validate_config(self, cfg_bmi: dict) -> None: err_out_screen( "CFSv2-NLDAS NWM bias correction can only be used in CFSv2-only configurations" ) + return run_cfs_nldas_bias_correct - # Read in supplemental precipitation options as an array of values to map. - try: - self.supp_precip_forcings = cfg_bmi["SuppPcp"] - except KeyError as e: - err_out_screen( - "Unable to locate SuppPcp under SuppForcing section in configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate SuppPcp under SuppForcing section in configuration file.", - e, - ) - except json.decoder.JSONDecodeError as e: - err_out_screen("Improper SuppPcp option specified in configuration file", e) - self.number_supp_pcp = len(self.supp_precip_forcings) + @property + def number_supp_pcp(self) -> int: + """Get the number of supplemental precipitation input forcings specified by the user in the configuration file. This is used to control how many supplemental precipitation input forcings are processed based on the number of supplemental precipitation input forcings specified in the configuration file.""" + return len(self.supp_precip_forcings) - # Read in the supp pcp types (GRIB[1|2], NETCDF) - try: - self.supp_precip_file_types = cfg_bmi["SuppPcpForcingTypes"] - self.supp_precip_file_types = [ - stype.strip() for stype in self.supp_precip_file_types - ] - if self.supp_precip_file_types == [""]: - self.supp_precip_file_types = [] - except KeyError as e: - err_out_screen( - "Unable to locate SuppPcpForcingTypes in SuppForcing section in the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate SuppPcpForcingTypes in SuppForcing section in the configuration file.", - e, - ) - if len(self.supp_precip_file_types) != self.number_supp_pcp: - err_out_screen( - "Number of SuppPcpForcingTypes ({}) must match the number " - "of SuppPcp inputs ({}) in the configuration file.".format( - len(self.supp_precip_file_types), self.number_supp_pcp - ) - ) - for file_type in self.supp_precip_file_types: - if file_type not in ["GRIB1", "GRIB2", "NETCDF"]: - err_out_screen( - 'Invalid SuppForcing file type "{}" specified. ' - "Only GRIB1, GRIB2, and NETCDF are supported".format(file_type) - ) + @property + def supp_precip_file_types(self) -> list: + """Get the list of supplemental precipitation input forcing file types specified by the user in the configuration file. This is used to control how supplemental precipitation input forcing files are read in and processed based on the file types specified for each supplemental precipitation input forcing in the configuration file.""" + return self._supp_precip_file_types + + @supp_precip_file_types.setter + def supp_precip_file_types(self, value: list) -> None: + """Set the list of supplemental precipitation input forcing file types specified by the user in the configuration file. This is used to control how supplemental precipitation input forcing files are read in and processed based on the file types specified for each supplemental precipitation input forcing in the configuration file.""" + if value is not None: + value = [stype.strip() for stype in value] + if value == [""]: + value = [] + self.check_number_of_inputs_supp_pcp(value, "SuppPcpForcingTypes") + self.check_input_values_in_range( + value, + "SuppPcpForcingTypes", + self.supplemental_precip_file_type_options, + ) + self._supp_precip_file_types = value + + @property + def supplemental_precip_file_type_options(self) -> list: + """Get the list of valid supplemental precipitation input forcing file types that can be specified by the user in the configuration file. This is used to control how supplemental precipitation input forcing files are read in and processed based on the file types specified for each supplemental precipitation input forcing in the configuration file.""" + return ["GRIB1", "GRIB2", "NETCDF"] + @property + def rqiMethod(self) -> list: + """Optional RQI method for radar-based data. 0 - Do not use any RQI filtering. Use all radar-based estimates. 1 - Use hourly MRMS Radar Quality Index grids, 2 - Use NWM monthly climatology grids (NWM only!!!!). + + Example- RqiMethod: 2 + """ + value = None if self.number_supp_pcp > 0: - # Check to make sure supplemental precip options make sense. Also read in the RQI threshold - # if any radar products where chosen. for suppOpt in self.supp_precip_forcings: - if suppOpt < 0 or suppOpt > 15: - err_out_screen( - "Please specify SuppForcing values between 1 and 15." - ) # Read in RQI threshold to apply to radar products. if suppOpt in (1, 2, 7, 10, 11, 12): - try: - self.rqiMethod = cfg_bmi["RqiMethod"] - except KeyError as e: - err_out_screen( - "Unable to locate RqiMethod under SuppForcing section in the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate RqiMethod under SuppForcing section in the configuration file.", - e, - ) - except json.decoder.JSONDecodeError as e: - err_out_screen( - "Improper RqiMethod option in the configuration file.", e - ) + value = self.extract_input_variable("RqiMethod") # Check that if we have more than one RqiMethod, it's the correct number - if type(self.rqiMethod) is list: - if len(self.rqiMethod) != self.number_supp_pcp: - err_out_screen( - "Number of RqiMethods ({}) must match the number " - "of SuppPcp inputs ({}) in the configuration file, or " - "supply a single method for all inputs".format( - len(self.rqiMethod), self.number_supp_pcp - ) - ) - elif type(self.rqiMethod) is int: + if type(value) is list: + self.check_number_of_inputs_supp_pcp(value, "RqiMethod") + elif type(value) is int: # Support 'classic' mode of single method - self.rqiMethod = [self.rqiMethod] * self.number_supp_pcp + value = [value] * self.number_supp_pcp # Make sure the RqiMethod(s) makes sense. - for method in self.rqiMethod: - if method < 0 or method > 2: - err_out_screen( - "Please specify RqiMethods of either 0, 1, or 2." - ) + for method in value: + self.check_input_values_in_range(method, "RqiMethod", [0, 1, 2]) + return value - try: - self.rqiThresh = cfg_bmi["RqiThreshold"] - except KeyError as e: - err_out_screen( - "Unable to locate RqiThreshold under SuppForcing section in the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate RqiThreshold under SuppForcing section in the configuration file.", - e, - ) - except json.decoder.JSONDecodeError as e: - err_out_screen( - "Improper RqiThreshold option in the configuration file.", e - ) + @property + def rqiThresh(self): + """Optional RQI threshold to be used to mask out. Currently used for MRMS products. Please choose a value from 0.0-1.0. Associated radar quality index files will be expected from MRMS data. - # Check that if we have more than one RqiThreshold, it's the correct number - if type(self.rqiThresh) is list: - if len(self.rqiThresh) != self.number_supp_pcp: - err_out_screen( - "Number of RqiThresholds ({}) must match the number " - "of SuppPcp inputs ({}) in the configuration file, or " - "supply a single threshold for all inputs".format( - len(self.rqiThresh), self.number_supp_pcp - ) - ) - elif type(self.rqiThresh) is float: + Example- RqiThreshold: 0.9 + """ + value = 1.0 + if self.number_supp_pcp > 0: + for supp_opt in self.supp_precip_forcings: + # Read in RQI threshold to apply to radar products. + if supp_opt in (1, 2, 7, 10, 11, 12): + value = self.extract_input_variable("RqiThresh") + + # Check that if we have more than one RqiThresh, it's the correct number + if type(value) is list: + self.check_number_of_inputs_supp_pcp(value, "RqiThresh") + elif type(value) is (int, float): # Support 'classic' mode of single threshold - self.rqiThresh = [self.rqiThresh] * self.number_supp_pcp + value = [value] * self.number_supp_pcp - # Make sure the RQI threshold makes sense. - for threshold in self.rqiThresh: + # Make sure the RqiThresh(es) makes sense. + for threshold in value: if threshold < 0.0 or threshold > 1.0: err_out_screen( "Please specify RqiThresholds between 0.0 and 1.0." ) + return value - # Read in the input directories for each supplemental precipitation product. - try: - self.supp_precip_dirs = cfg_bmi["SuppPcpDirectories"] - except KeyError as e: - err_out_screen( - "Unable to locate SuppPcpDirectories in SuppForcing section in the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate SuppPcpDirectories in SuppForcing section in the configuration file.", - e, - ) + @property + def supp_precip_mandatory(self): + """Specify whether the Supplemental Precips listed above are mandatory, or optional. This is important for layering contingencies if a product is missing, but forcing files are still desired. 0 - Not mandatory, 1 - Mandatory. - # Loop through and ensure all supp pcp directories exist. Also strip out any whitespace - # or new line characters. - for dirTmp in range(0, len(self.supp_precip_dirs)): - self.supp_precip_dirs[dirTmp] = self.supp_precip_dirs[dirTmp].strip() - if not os.path.isdir(self.supp_precip_dirs[dirTmp]): - try: - os.makedirs(self.supp_precip_dirs[dirTmp], exist_ok=True) - LOG.debug( - f"Created supp pcp directory: {self.supp_precip_dirs[dirTmp]}" - ) - except OSError as e: - err_out_screen( - f"Unable to create supp pcp directory: {self.supp_precip_dirs[dirTmp]}. Error: {e}" - ) + Example- SuppPcpMandatory: [0, 0, 0] + """ + return self._supp_precip_mandatory - # Special case for ExtAnA where we treat comma separated stage IV, MRMS data as one SuppPcp input - if 11 in self.supp_precip_forcings or 12 in self.supp_precip_forcings: - if len(self.supp_precip_forcings) != 1: - err_out_screen( - "CONUS or Alaska Stage IV/MRMS SuppPcp option is only supported as a standalone option" - ) - self.supp_precip_dirs = [",".join(self.supp_precip_dirs)] + @supp_precip_mandatory.setter + def supp_precip_mandatory(self, value): + """Set the list of flags for whether each supplemental precipitation input forcing specified by the user in the configuration file is mandatory or optional. This is used to control whether an error is raised if supplemental precipitation input forcing files are not found for each supplemental precipitation input forcing based on whether the user has specified each supplemental precipitation input forcing as mandatory or optional in the configuration file.""" + if self.number_supp_pcp > 0: + self.check_input_values_in_range(value, "SuppPcpMandatory", [0, 1]) + self._supp_precip_mandatory = value + else: + self._supp_precip_mandatory = None - if len(self.supp_precip_dirs) != self.number_supp_pcp: - err_out_screen( - "Number of SuppPcpDirectories must match the number of SuppForcing in the configuration file." - ) + @property + def regrid_opt_supp_pcp(self): + """Specify regridding options for the supplemental precipitation products. Options available are: 1 - ESMF Bilinear, 2 - ESMF Nearest Neighbor, 3 - ESMF Conservative Bilinear. - # Process supplemental precipitation enforcement options - try: - self.supp_precip_mandatory = cfg_bmi["SuppPcpMandatory"] - except KeyError as e: - err_out_screen( - "Unable to locate SuppPcpMandatory under the SuppForcing section in the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate SuppPcpMandatory under the SuppForcing section in the configuration file.", - e, - ) - except json.decoder.JSONDecodeError as e: - err_out_screen( - "Improper SuppPcpMandatory options specified in the configuration file.", - e, - ) - if len(self.supp_precip_mandatory) != self.number_supp_pcp: - err_out_screen( - "Please specify SuppPcpMandatory values for each corresponding " - "supplemental precipitation options in the configuration file." - ) - # Check to make sure enforcement options makes sense. - for enforceOpt in self.supp_precip_mandatory: - if enforceOpt < 0 or enforceOpt > 1: - err_out_screen( - "Invalid SuppPcpMandatory chosen in the configuration file. " - "Please choose a value of 0 or 1 for each corresponding " - "supplemental precipitation product." - ) + Example- RegridOptSuppPcp: [1, 1, 1] + """ + return self._regrid_opt_supp_pcp - # Read in the regridding options. - try: - self.regrid_opt_supp_pcp = cfg_bmi["RegridOptSuppPcp"] - except KeyError as e: - err_out_screen( - "Unable to locate RegridOptSuppPcp under the SuppForcing section in the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate RegridOptSuppPcp under the SuppForcing section in the configuration file.", - e, - ) - except json.decoder.JSONDecodeError as e: - err_out_screen( - "Improper RegridOptSuppPcp options specified in the configuration file.", - e, - ) - if len(self.regrid_opt_supp_pcp) != self.number_supp_pcp: - err_out_screen( - "Please specify RegridOptSuppPcp values for each corresponding supplemental " - "precipitation product in the configuration file." - ) - # Check to make sure regridding options makes sense. - for regridOpt in self.regrid_opt_supp_pcp: - if regridOpt < 1 or regridOpt > 3: - err_out_screen( - "Invalid RegridOptSuppPcp chosen in the configuration file. " - "Please choose a value of 1-3 for each corresponding " - "supplemental precipitation product." - ) + @regrid_opt_supp_pcp.setter + def regrid_opt_supp_pcp(self, value): + """Set the list of regridding options for supplemental precipitation input forcings specified by the user in the configuration file. This is used to control how supplemental precipitation input forcings are regridded based on the regridding option specified for each supplemental precipitation input forcing in the configuration file.""" + if self.number_supp_pcp > 0: + self.check_input_values_in_range(value, "RegridOptSuppPcp", [1, 2, 3]) + self._regrid_opt_supp_pcp = value + else: + self._regrid_opt_supp_pcp = None - # Read in temporal interpolation options. - try: - self.suppTemporalInterp = cfg_bmi["SuppPcpTemporalInterpolation"] - except KeyError as e: - err_out_screen( - "Unable to locate SuppPcpTemporalInterpolation under the SuppForcing section in the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate SuppPcpTemporalInterpolation under the SuppForcing section in the configuration file.", - e, - ) - except json.decoder.JSONDecodeError as e: - err_out_screen( - "Improper SuppPcpTemporalInterpolation options specified in the configuration file.", - e, - ) - if len(self.suppTemporalInterp) != self.number_supp_pcp: - err_out_screen( - "Please specify SuppPcpTemporalInterpolation values for each " - "corresponding supplemental precip products in the configuration file." - ) - # Ensure the SuppPcpTemporalInterpolation values make sense. - for temporalInterpOpt in self.suppTemporalInterp: - if temporalInterpOpt < 0 or temporalInterpOpt > 2: - err_out_screen( - "Invalid SuppPcpTemporalInterpolation chosen in the configuration file. " - "Please choose a value of 0-2 for each corresponding input forcing" - ) + @property + def suppTemporalInterp(self): + """Specify the time interpretation methods for the supplemental precipitation products. - # Read in max time option - try: - self.supp_pcp_max_hours = cfg_bmi["SuppPcpMaxHours"] - except (KeyError, configparser.NoOptionError): - self.supp_pcp_max_hours = ( - None # if missing, don't care, just assume all time - ) + Example- SuppPcpTemporalInterpolation: [0, 0, 0] + """ + return self._suppTemporalInterp - except json.decoder.JSONDecodeError as e: - err_out_screen( - "Improper SuppPcpMaxHours options specified in the configuration file.", - e, - ) + @suppTemporalInterp.setter + def suppTemporalInterp(self, value): + """Set the list of flags for whether temporal interpolation of supplemental precipitation input forcings specified by the user in the configuration file is performed or not. This is used to control whether temporal interpolation of supplemental precipitation input forcings is performed based on whether the user has chosen to perform temporal interpolation for each supplemental precipitation input forcing in the configuration file.""" + if self.number_supp_pcp > 0: + self.check_input_values_in_range( + value, "SuppPcpTemporalInterpolation", [0, 1, 2] + ) + self._suppTemporalInterp = value + else: + self._suppTemporalInterp = None - if type(self.supp_pcp_max_hours) is list: - if len(self.supp_pcp_max_hours) != self.number_supp_pcp: - err_out_screen( - "Number of SuppPcpMaxHours ({}) must match the number " - "of SuppPcp inputs ({}) in the configuration file, or " - "supply a single threshold for all inputs".format( - len(self.supp_pcp_max_hours), self.number_supp_pcp - ) - ) - elif type(self.supp_pcp_max_hours) is float: - # Support 'classic' mode of single threshold - self.supp_pcp_max_hours = [ - self.supp_pcp_max_hours - ] * self.number_supp_pcp + @property + def supp_pcp_max_hours(self): + """Get the list of maximum forecast hours for supplemental precipitation input forcings specified by the user in the configuration file. This is used to control how supplemental precipitation input forcings are processed based on the maximum forecast hour specified for each supplemental precipitation input forcing in the configuration file.""" + return self._supp_pcp_max_hours - # Read in the SuppPcpInputOffsets options. - try: - self.supp_input_offsets = cfg_bmi["SuppPcpInputOffsets"] - except KeyError as e: - err_out_screen( - "Unable to locate SuppPcpInputOffsets under SuppForcing section in the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate SuppPcpInputOffsets under SuppForcing section in the configuration file.", - e, - ) - except json.decoder.JSONDecodeError as e: - err_out_screen( - "Improper SuppPcpInputOffsets option specified in the configuration file.", - e, - ) - if len(self.supp_input_offsets) != self.number_supp_pcp: - err_out_screen( - "Please specify SuppPcpInputOffsets values for each " - "corresponding input forcings for SuppForcing." - ) - # Check to make sure the input offset options make sense. There will be additional - # checking later when input choices are mapped to input products. - for inputOffset in self.supp_input_offsets: - if inputOffset < 0: - err_out_screen( - "Please specify SuppPcpInputOffsets values greater than or equal to zero." - ) + @supp_pcp_max_hours.setter + def supp_pcp_max_hours(self, value): + """Set the list of maximum forecast hours for supplemental precipitation input forcings specified by the user in the configuration file. This is used to control how supplemental precipitation input forcings are processed based on the maximum forecast hour specified for each supplemental precipitation input forcing in the configuration file.""" + if self.number_supp_pcp > 0: + if isinstance(value, list): + self.check_number_of_inputs_supp_pcp(value, "SuppPcpMaxHours") + elif isinstance(value, float) or isinstance(value, int): + value = [value] * self.number_supp_pcp + self._supp_pcp_max_hours = value + else: + self._supp_pcp_max_hours = None - # Read in the optional parameter directory for supplemental precipitation. - try: - self.supp_precip_param_dir = cfg_bmi["SuppPcpParamDir"] - except KeyError as e: - err_out_screen( - "Unable to locate SuppPcpParamDir under the SuppForcing section in the configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate SuppPcpParamDir under the SuppForcing section in the configuration file.", - e, - ) - except ValueError as e: - err_out_screen( - "Improper SuppPcpParamDir option specified in the configuration file.", - e, - ) - if not os.path.isdir(self.supp_precip_param_dir): - try: - os.makedirs(self.supp_precip_param_dir, exist_ok=True) - LOG.debug( - f"Created missing SuppPcpParamDir: {self.supp_precip_param_dir}" - ) - except OSError as e: + @property + def supp_input_offsets(self): + """In AnA runs, this value is the offset from the available forecast and 00z. For example, if forecast are available at 06z and 18z, set this value to 6. + + Example- SuppPcpInputOffsets = [0, 0, 0] + """ + return self._supp_input_offsets + + @supp_input_offsets.setter + def supp_input_offsets(self, value): + """Set the list of time offsets to apply to supplemental precipitation input forcing files specified by the user in the configuration file. This is used to control how supplemental precipitation input forcing files are processed based on the time offset specified for each supplemental precipitation input forcing in the configuration file.""" + if self.number_supp_pcp > 0: + self.check_number_of_inputs_supp_pcp(value, "SuppPcpInputOffsets") + self._supp_input_offsets = value + else: + self._supp_input_offsets = None + + @property + def supp_precip_dirs(self): + """Specify the correponding supplemental precipitation directories that will be searched for input files. + + Example- SuppPcpDirectories: ['./MRMS_CONUS_GAUGE', './MRMS_CONUS_MULTISENSOR', './MRMS_CLASSIFICATION'] + """ + return self._supp_precip_dirs + + @supp_precip_dirs.setter + def supp_precip_dirs(self, value): + """Set the list of pathways to the supplemental precipitation input forcing directories specified by the user in the configuration file. This is used to control where the program looks for supplemental precipitation input forcing files for each supplemental precipitation input forcing based on the directory specified for each supplemental precipitation input forcing in the configuration file.""" + if self.number_supp_pcp > 0: + self.check_number_of_inputs_supp_pcp(value, "SuppPcpDirectories") + # Loop through and ensure all supp pcp directories exist. Also strip out any whitespace + # or new line characters. + for dirTmp in range(0, len(value)): + value[dirTmp] = value[dirTmp].strip() + self.try_make_dir(value[dirTmp], " supp pcp") + + # Special case for ExtAnA where we treat comma separated stage IV, MRMS data as one SuppPcp input + if 11 in self.supp_precip_forcings or 12 in self.supp_precip_forcings: + if len(self.supp_precip_forcings) != 1: err_out_screen( - f"Unable to locate SuppPcpParamDir: {self.supp_precip_param_dir}. Error: {e}" + "CONUS or Alaska Stage IV/MRMS SuppPcp option is only supported as a standalone option" ) + value = [",".join(value)] + self._supp_precip_dirs = value + else: + self._supp_precip_dirs = None + @property + def supp_precip_param_dir(self): + """Specify an optional directory that contains supplemental precipitation parameter fields, I.E monthly RQI climatology. This is ONLY needed for the original NWM WRF-Hydro domain. Otherwise, just point it to a random directory and it will be ignored. + + Example- SuppPcpParamDir: ['./forcingParam/AnA','./forcingParam/AnA','./forcingParam/AnA'] + """ + return self._supp_precip_param_dir + + @supp_precip_param_dir.setter + def supp_precip_param_dir(self, value): + """Set the directory where downscaling parameters for supplemental precipitation input forcings are stored specified by the user in the configuration file. This is used to control where the program looks for downscaling parameter files for supplemental precipitation input forcings based on the directory specified for supplemental precipitation input forcings in the configuration file.""" + if self.number_supp_pcp > 0: + self.try_make_dir(value, " SuppPcpParamDir") + self._supp_precip_param_dir = value + else: + self._supp_precip_param_dir = None + + @property + def cfsv2EnsMember(self): + """Set the CFSv2 ensemble member to process specified by the user in the configuration file. This is used to control which CFSv2 ensemble member is processed for CFSv2 input forcings based on the ensemble member specified in the configuration file.""" + value = None if not self.precip_only_flag: # Read in Ensemble information # Read in CFS ensemble member information IF we have chosen CFSv2 as an input # forcing. for opt_tmp in self.input_forcings: if opt_tmp == 7: - try: - self.cfsv2EnsMember = cfg_bmi["cfsEnsNumber"] - LOG.debug(f"ens mem: {self.cfsv2EnsMember}") - LOG.debug(f"cfg ens mem: {cfg_bmi['cfsEnsNumber']}") - except KeyError as e: - err_out_screen( - "Unable to locate cfsEnsNumber under the Ensembles section of the configuration file", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate cfsEnsNumber under the Ensembles section of the configuration file", - e, - ) - except json.JSONDecodeError as e: - err_out_screen( - "Improper cfsEnsNumber options specified in the configuration file", - e, - ) - if int(self.cfsv2EnsMember) < 1 or int(self.cfsv2EnsMember) > 4: - err_out_screen( - "Please chose an cfsEnsNumber value of 1,2,3 or 4." - ) + value = self.extract_input_variable("cfsEnsNumber") + self.check_input_values_in_range( + value, "cfsEnsNumber", [1, 2, 3, 4] + ) + return value - # Read in information for the custom input NetCDF files that are to be processed. - # Read in the ForecastInputHorizons options. - try: - self.customFcstFreq = cfg_bmi["custom_input_fcst_freq"] - except KeyError as e: - err_out_screen( - "Unable to locate custom_input_fcst_freq under Custom section in configuration file.", - e, - ) - except configparser.NoOptionError as e: - err_out_screen( - "Unable to locate custom_input_fcst_freq under Custom section in configuration file.", - e, - ) - except json.decoder.JSONDecodeError as je: - err_out_screen( - "Improper custom_input_fcst_freq option specified in configuration file: " - + str(je) - ) - if len(self.customFcstFreq) != self.number_custom_inputs: + @property + def customFcstFreq(self): + """Get the custom forecast frequency in minutes specified by the user in the configuration file. This is used to control how often forecasts are issued based on the custom forecast frequency specified in the configuration file.""" + return self._customFcstFreq + + @customFcstFreq.setter + def customFcstFreq(self, value): + """Options for specifying custom input NetCDF forcing files (in minutes). Choose the input frequency of files that are being processed. I.E., are the input files every 15 minutes, 60 minutes, 3-hours, etc. Please specify the length of custom input frequencies to match the number of custom NetCDF inputs selected above in the Logistics section. + + Example- custom_input_fcst_freq: [] + """ + if not self.precip_only_flag: + if len(value) != self.number_custom_inputs: err_out_screen( - f"Improper custom_input fcst_freq specified. " - f"This number ({len(self.customFcstFreq)}) must " - f"match the frequency of custom input forcings selected " - f"({self.number_custom_inputs})." + f"Improper custom_input fcst_freq specified. This number ({len(value)}) must match the frequency of custom input forcings selected ({self.number_custom_inputs})." ) + self._customFcstFreq = value + else: + self._customFcstFreq = None @property def nwm_domain(self) -> str: diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/consts.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/consts.py index aad7cb71..c13275b1 100644 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/consts.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/consts.py @@ -972,3 +972,111 @@ "file_type": "input_force_types", } } + +CONFIGOPTIONS = { + "ConfigOptions": [ + "bmi_time", + "current_time", + "supp_precip_dirs", + "supp_precip_param_dir", + "supp_precip_mandatory", + "e_date_proc", + "first_fcst_cycle", + "current_fcst_cycle", + "current_output_step", + "prev_output_date", + "current_output_date", + "future_time", + "nFcsts", + "process_window", + "grid_meta", + "ExactExtract", + "errMsg", + "statusMsg", + "logFile", + "logHandle", + "paramFlagArray", + "nwmVersion", + "nwmConfig", + "forcing_output", + "aws", + "aws_obj", + "aws_time", + "nwm_geogrid", + "geopackage", + "uid64", + ], + "var_rename_map": {"config_path": "cfg_bmi"}, + "extract_input_variable_attrs_map": { + "OutputFrequency": "output_freq", + "SubOutputHour": "sub_output_hour", + "SubOutFreq": "sub_output_freq", + "ScratchDir": "scratch_dir", + "compressOutput": "useCompression", # 0 + "AnAFlag": "ana_flag", + "LookBack": "look_back", + "ForecastFrequency": "fcst_freq", + "ForecastShift": "fcst_shift", + "GRID_TYPE": "grid_type", + "DownscalingParamDirs": "dScaleParamDirs", + "SuppPcpDirectories": "supp_precip_dirs", + "SuppPcpMandatory": "supp_precip_mandatory", + "RegridOptSuppPcp": "regrid_opt_supp_pcp", + "SuppPcpTemporalInterpolation": "suppTemporalInterp", + "SuppPcpInputOffsets": "supp_input_offsets", + "SuppPcpParamDir": "supp_precip_param_dir", + "SuppPcpForcingTypes": "supp_precip_file_types", + }, + "extract_input_variable_attrs_map_precip_only": { + "customSuppPcpFreq": "customSuppPcpFreq", + }, + "extract_input_variable_attrs_map_not_precip_only": { + "ForecastInputHorizons": "fcst_input_horizons", # np + "ForecastInputOffsets": "fcst_input_offsets", # np + "IgnoredBorderWidths": "ignored_border_widths", # np + "RegridOpt": "regrid_opt", # np + "ForcingTemporalInterpolation": "forceTemoralInterp", # np + "TemperatureDownscaling": "t2dDownscaleOpt", # np + "PressureDownscaling": "psfcDownscaleOpt", # np + "ShortwaveDownscaling": "swDownscaleOpt", # np + "HumidityDownscaling": "q2dDownscaleOpt", # np + "PrecipDownscaling": "precipDownscaleOpt", # np -complicated partial np + "TemperatureBiasCorrection": "t2BiasCorrectOpt", # np #no + "PressureBiasCorrection": "psfcBiasCorrectOpt", # np #yes + "HumidityBiasCorrection": "q2BiasCorrectOpt", # np #yes + "WindBiasCorrection": "windBiasCorrect", # np #yes + "SwBiasCorrection": "swBiasCorrectOpt", # np #yes + "LwBiasCorrection": "lwBiasCorrectOpt", # np #yes + "PrecipBiasCorrection": "precipBiasCorrectOpt", # np #yes + "InputForcingTypes": "input_force_types", # np + "InputForcingDirectories": "input_force_dirs", # np + "InputMandatory": "input_force_mandatory", # np + "custom_input_fcst_freq": "customFcstFreq", # np + }, + "downscaling_attrs_map": { + "SINALPHA": "sinalpha_var", + "COSALPHA": "cosalpha_var", + "SLOPE": "slope_var", + "SLOPE_AZIMUTH": "slope_azimuth_var", + "HGT": "hgt_var", + }, + "downscaling_unstructred_attrs_map": { + "SLOPE_ELEM": "slope_var_elem", + "SLOPE_AZIMUTH_ELEM": "slope_azimuth_var_elem", + "HGT_ELEM": "hgt_elem_var", + }, + "extract_input_variable_set_default_attrs_map": { + "includeLQFrac": "include_lqfrac", + "floatOutput": "useFloats", + "Output": "forcing_output", + "SuppPcpMaxHours": "supp_pcp_max_hours", + "RegridWeightsDir": "weightsDir", # np + }, + "try_config_get_except_attr_map": { + "RefcstBDateProc": "b_date_proc", + "Geopackage": "geopackage", + "GeogridIn": "geogrid", + "SpatialMetaIn": "spatial_meta", + }, + "file_types": ["GRIB1", "GRIB2", "NETCDF", "NETCDF4", "NWM", "ZARR"], +} diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/forcingInputMod.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/forcingInputMod.py index b7fabf99..aa41c03d 100755 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/forcingInputMod.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/forcingInputMod.py @@ -94,7 +94,8 @@ def _initialize_config_options(self) -> None: Check if the attibute allready exists before setting. """ - for key, val in list(vars(self.config_options).items()): + for key in dir(self.config_options): + val=getattr(self.config_options,key) if ( isinstance(val, list) and len(val) > 0 diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/parallel.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/parallel.py index 8912fd11..54a630bf 100755 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/parallel.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/parallel.py @@ -1,9 +1,11 @@ +from __future__ import annotations + import atexit -from functools import partial import os -import uuid import signal import sys +from functools import partial +from typing import TYPE_CHECKING import mpi4py import numpy as np @@ -12,9 +14,11 @@ from mpi4py import MPI -from .config import ConfigOptions -from . import err_handler -from . import mpi_utils +if TYPE_CHECKING: + from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.config import ( + ConfigOptions, + ) +from . import err_handler, mpi_utils # If MPI was initialized outside of python, # disable initialization/finalization behavior diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/esmf_utils.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/esmf_utils.py index 2076278a..f49b8960 100644 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/esmf_utils.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/esmf_utils.py @@ -1,3 +1,6 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any import types import esmpy as ESMF @@ -7,8 +10,9 @@ import shapely from . import retry_utils -from .core.config import ConfigOptions -from .core.parallel import MpiConfig +if TYPE_CHECKING: + from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.config import ConfigOptions + from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.parallel import MpiConfig @retry_utils.retry_w_mpi_context( diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/os_utils.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/os_utils.py index 3d823c60..7853da24 100644 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/os_utils.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/os_utils.py @@ -1,9 +1,19 @@ -from . import retry_utils +from __future__ import annotations + import traceback import types import typing -from .core.parallel import MpiConfig -from .core.config import ConfigOptions +from typing import TYPE_CHECKING + +from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine import retry_utils + +if TYPE_CHECKING: + from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.config import ( + ConfigOptions, + ) + from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.parallel import ( + MpiConfig, + ) import os diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/retry_utils.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/retry_utils.py index 779dd7dd..81594156 100644 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/retry_utils.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/retry_utils.py @@ -1,10 +1,18 @@ +from __future__ import annotations + import functools import time import traceback import types +from typing import TYPE_CHECKING, Any -from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.parallel import MpiConfig -from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.config import ConfigOptions +if TYPE_CHECKING: + from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.config import ( + ConfigOptions, + ) + from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.parallel import ( + MpiConfig, + ) def retry_w_mpi_context( @@ -40,6 +48,13 @@ def wrapper( *args, **kwargs, ): + from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.config import ( + ConfigOptions, + ) + from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.parallel import ( + MpiConfig, + ) + if not isinstance(mpi_config, MpiConfig): raise TypeError( f"Expected type {MpiConfig} for mpi_config, got: {type(mpi_config)}"