diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/bmi_model.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/bmi_model.py index c1b2f692..196395d5 100755 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/bmi_model.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/bmi_model.py @@ -353,7 +353,7 @@ def initialize(self, config_file: str, output_path: str | None = None) -> None: self._values["time_step_size"] = self.cfg_bmi["time_step_seconds"] # Initialize the Forcings Engine model - self._model = NWMv3ForcingEngineModel() + self._model = NWMv3ForcingEngineModel(self) # Set catchment ids if using hydrofabric if self._grid_type == "hydrofabric": @@ -469,16 +469,7 @@ def update_until(self, future_time: float): == future_time == self.cfg_bmi["initial_time"] ): - self._model.run( - self._values, - future_time, - self._job_meta, - self.geo_meta, - self._input_forcing_mod, - self._supp_pcp_mod, - self._mpi_meta, - self._output_obj, - ) + self._model.run(future_time) else: # Start a while loop to iterate the model time step by step until the # current model time reaches or exceeds the future_time. @@ -486,16 +477,7 @@ def update_until(self, future_time: float): # Advance the model time by the defined time step size. self._values["current_model_time"] += self._values["time_step_size"] # Run the model for the new current time and update the state. - self._model.run( - self._values, - self._values["current_model_time"], - self._job_meta, - self.geo_meta, - self._input_forcing_mod, - self._supp_pcp_mod, - self._mpi_meta, - self._output_obj, - ) + self._model.run(self._values["current_model_time"]) # ------------------------------------------------------------ def finalize(self): diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/consts.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/consts.py index aad7cb71..9444dc8c 100644 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/consts.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/consts.py @@ -955,6 +955,22 @@ "precipBiasCorrectOpt", ], } + +MODEL = { + # Used by method `model.NWMv3ForcingEngineModel.update_bmi_output_dict` + "update_dict_base_vars": [ + "U2D", + "V2D", + "LWDOWN", + "RAINRATE", + "T2D", + "Q2D", + "PSFC", + "SWDOWN", + ], + "update_dict_var_include_lqfraq": "LQFRAC", +} + TEST_UTILS = { "OLD_NEW_VAR_MAP": { "q2dBiasCorrectOpt": "q2BiasCorrectOpt", diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/model.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/model.py index 2c27669e..16b1c4a7 100755 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/model.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/model.py @@ -1,7 +1,13 @@ +"""NWMv3ForcingEngineModel, to be constructed and managed by inheritors of NWMv3_Forcing_Engine_BMI_model_Base from bmi_model.py""" + +from __future__ import annotations + +import copy import datetime -import os from contextlib import contextmanager -from time import time +from functools import partial +from time import perf_counter +from typing import TYPE_CHECKING import ewts import numpy as np @@ -12,16 +18,12 @@ disaggregateMod, downscale, err_handler, + forcingInputMod, layeringMod, ) -from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.config import ( - ConfigOptions, -) -from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.geoMod import ( - GeoMeta, +from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.consts import ( + MODEL as model_consts, ) -from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.ioMod import OutputObj -from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.parallel import MpiConfig from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.historical_forcing import ( AORCAlaskaProcessor, AORCConusProcessor, @@ -30,25 +32,28 @@ NWMV3OConusProcessor, ) +if TYPE_CHECKING: + from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.bmi_model import ( + NWMv3_Forcing_Engine_BMI_model_Base, + ) + LOG = ewts.get_logger(ewts.FORCING_ID) @contextmanager def timing_block(step_str: str): - """Context manager for timing code execution. - - Args: - step_str: Description of the step being timed. + """Context manager for timing code execution. Used by the decorator ``time_function``. + :param str step_str: Description of the step being timed. """ - start = time() + start = perf_counter() yield - end = time() - LOG.debug(f" Execution time for {step_str}: {round(end - start, 2)} seconds") + end = perf_counter() + LOG.debug(msg=f" Execution time for {step_str}: {round(end - start, 2)} seconds") def time_function(func): - """Measure the execution time of a function.""" + """Decorator for measuring the execution time of a function.""" def wrapper(*args, **kwargs): with timing_block(f"Executing {func.__name__}"): @@ -59,748 +64,634 @@ def wrapper(*args, **kwargs): class NWMv3ForcingEngineModel: - """NextGen Forcings Engine BMI model class for NWMv3 forcings.""" + """NextGen Forcings Engine BMI model class for NWMv3 forcings. - def __init__(self): - """Initialize the NWMv3 Forcing Engine Model.""" + To be constructed and managed by inheritors of NWMv3_Forcing_Engine_BMI_model_Base from bmi_model.py. + """ + + def __init__(self, bmi_model: NWMv3_Forcing_Engine_BMI_model_Base): + """Initialize the NWMv3 Forcing Engine model. + + :param bmi_model NWMv3_Forcing_Engine_BMI_model_Base: BMI model instance to initialize. + """ self.source_data_processor = None + self._bmi = bmi_model + # Partials + self.log_info = partial( + err_handler.log_msg, self._bmi._job_meta, self._bmi._mpi_meta, False + ) + self.log_debug = partial( + err_handler.log_msg, self._bmi._job_meta, self._bmi._mpi_meta, True + ) - # TODO: refactor the bmi_model.py file and this to have this type maintain its own state. - # def __init__(self): - # super(ngen_model, self).__init__() - # #self._model = model - - # @dask.delayed - # def aws_obj(files): - # return xr.open_mfdataset(files, engine="zarr", parallel=True, consolidated=True) - - def run( - self, - model: dict, - future_time: float, - config_options: ConfigOptions, - wrf_hydro_geo_meta: GeoMeta, - input_forcing_mod: dict, - supp_pcp_mod: dict, - mpi_config: MpiConfig, - output_obj: OutputObj, - ) -> None: + def check_program_status(self) -> None: + """Call err_handler.check_program_status""" + err_handler.check_program_status(self._bmi._job_meta, self._bmi._mpi_meta) + + def run(self, future_time: float) -> None: """Execute the full forcings engine BMI pipeline for a given future timestep. - This method updates the `model` state dictionary with atmospheric forcings computed from - available input datasets. It handles initialization, AWS Zarr loading, regridding, temporal - interpolation, bias correction, downscaling, supplemental precipitation processing, and output - population into the model structure. + This method updates the ``self._bmi._values`` state dictionary with atmospheric + forcings computed from available input datasets. It handles initialization, + AWS Zarr loading, regridding, temporal interpolation, bias correction, + downscaling, supplemental precipitation processing, and output population into + the ``self._bmi._values`` structure. + + ``self._bmi._job_meta``, an instance of ``ConfigOptions``, is also updated + in-place, for example for forecast time handling. The following steps are performed: 1. Determine the current forecast and output times based on the future timestamp - and analysis mode (AnA or forecast). + and analysis mode (AnA or forecast). 2. Initialize or reset output grids and step counters. 3. Loop over each input forcing product: - a. Calculate neighboring input files. - b. Load AWS-hosted Zarr datasets if needed. - c. Regrid input forcings to the model grid. - d. Perform temporal interpolation. - e. Apply bias correction and downscaling. - f. Layer final forcings into the output object. + a. Calculate neighboring input files. + b. Load AWS-hosted Zarr datasets if needed. + c. Regrid input forcings to the model grid. + d. Perform temporal interpolation. + e. Apply bias correction and downscaling. + f. Layer final forcings into the output object. 4. Optionally process supplemental precipitation forcings: - a. Regrid and validate. - b. Disaggregate and interpolate. - c. Layer into the final output. + a. Regrid and validate. + b. Disaggregate and interpolate. + c. Layer into the final output. 5. Write output to NetCDF forcing files if requested. - 6. Update the model state dictionary with flattened arrays. + 6. Update the ``self._bmi._values`` state dictionary with flattened arrays. 7. Advance the BMI time index. - :param model: The model state dictionary that will be updated with new forcing data. - :param future_time: The number of seconds into the future to advance the model. - :param config_options: Configuration object containing all model options, flags, and paths. - :param wrf_hydro_geo_meta: Geospatial metadata needed for regridding and interpolation. - :param input_forcing_mod: Dictionary of initialized input forcing modules indexed by forcing key. - :param supp_pcp_mod: Dictionary of supplemental precipitation modules indexed by key. - :param mpi_config: Object containing MPI communication settings such as rank and communicator. - :param output_obj: Output object that stores the generated atmospheric forcing arrays. + :param float future_time: Timestamp, represented as *seconds relative to overall + start time*, to advance to before returning. Since this value is relative + to the overall start time, it is unaware of the actual UTC datetimestamp of + the start. For example, since 1-hour timesteps are typical, the first value + would typically be 3600, the second value 7200, etc. - :raises RuntimeError: If the model fails to initialize or if required arguments are missing. + :raises RuntimeError: If the model fails to initialize or if required arguments + are missing. """ - ( - future_time, - config_options, - ) = self.determine_forecast( - future_time, - config_options, - ) - ( - config_options, - input_forcing_mod, - mpi_config, - ) = self.adjust_precip( - config_options, - input_forcing_mod, - mpi_config, - ) - ( - config_options, - mpi_config, - ) = self.log_forecast( - config_options, - mpi_config, - ) - ( - future_time, - config_options, - wrf_hydro_geo_meta, - input_forcing_mod, - supp_pcp_mod, - mpi_config, - output_obj, - input_forcings, - ) = self.loop_through_forcing_products( - future_time, - config_options, - wrf_hydro_geo_meta, - input_forcing_mod, - supp_pcp_mod, - mpi_config, - output_obj, - ) - ( - config_options, - wrf_hydro_geo_meta, - supp_pcp_mod, - mpi_config, - output_obj, - ) = self.process_suplemental_precip( - config_options, - wrf_hydro_geo_meta, - supp_pcp_mod, - mpi_config, - output_obj, - input_forcings, - ) - ( - config_options, - wrf_hydro_geo_meta, - mpi_config, - output_obj, - ) = self.write_output( - config_options, - wrf_hydro_geo_meta, - mpi_config, - output_obj, - ) - ( - model, - config_options, - wrf_hydro_geo_meta, - output_obj, - ) = self.update_dict( - model, - config_options, - wrf_hydro_geo_meta, - output_obj, - ) + + self.set_cycle_timing_attrs(future_time) + self.set_skip_flags() + self.log_cycle() + input_forcings = self.loop_through_forcing_products(future_time) + self.process_suplemental_precip(input_forcings) + self.write_output() + self.update_bmi_output_dict() ## Update BMI model time index to next iteration - config_options.bmi_time_index += 1 + self._bmi._job_meta.bmi_time_index += 1 @time_function - def determine_forecast( - self, - future_time: float, - config_options: ConfigOptions, - ): - """Determine the forecast for the given future time and configuration.""" + def set_cycle_timing_attrs(self, future_time: float) -> None: + """Determine the forecast for the given future time and configuration. + + :warning: Modifies mutable arguments in-place + """ # Assign the future time to the configuration - config_options.bmi_time = future_time - self.disaggregate_fun = disaggregateMod.disaggregate_factory(config_options) + self._bmi._job_meta.bmi_time = future_time + self.disaggregate_fun = disaggregateMod.disaggregate_factory( + self._bmi._job_meta + ) # Calculate current time stamp based on operational configuration - if config_options.ana_flag: + if self._bmi._job_meta.ana_flag: # If we're in an AnA configuration, then must offset the BMI future # timestamp to account for the "lookback" period being properly iterated # over between 3-28 hour look back time period and operation configuration - if config_options.input_forcings[0] in [20, 22]: - config_options.current_fcst_cycle = ( - config_options.b_date_proc - + pd.TimedeltaIndex( - np.array([future_time - 7200.0], dtype=float), "s" - )[0] + # TODO confirm these codes, and should they consider all input_forcings not just [0]? + if self._bmi._job_meta.input_forcings[0] in [20, 22]: + # NOTE This appears to be intending to operate on Alaska-only AnA. + delta = pd.TimedeltaIndex( + np.array([future_time - 7200.0], dtype=float), "s" + )[0] + self._bmi._job_meta.current_fcst_cycle = ( + self._bmi._job_meta.b_date_proc + delta ) - config_options.current_time = ( - config_options.b_date_proc - + pd.TimedeltaIndex( - np.array([future_time - 7200.0], dtype=float), "s" - )[0] + self._bmi._job_meta.current_time = ( + self._bmi._job_meta.b_date_proc + delta ) - config_options.future_time = future_time + self._bmi._job_meta.future_time = future_time else: + # NOTE below comment was original, but this appears to be operating on all non-Alaska AnA, not just Puerto Rico / Hawaii AnA. # Puerto Rico / Hawaii AnA: 1-hour lookback (based on 6-hourly forecast cycles) - config_options.current_fcst_cycle = ( - config_options.b_date_proc - + pd.TimedeltaIndex( - np.array([future_time - 3600.0], dtype=float), "s" - )[0] + delta = pd.TimedeltaIndex( + np.array([future_time - 3600.0], dtype=float), "s" + )[0] + self._bmi._job_meta.current_fcst_cycle = ( + self._bmi._job_meta.b_date_proc + delta ) - config_options.current_time = ( - config_options.b_date_proc - + pd.TimedeltaIndex( - np.array([future_time - 3600.0], dtype=float), "s" - )[0] + self._bmi._job_meta.current_time = ( + self._bmi._job_meta.b_date_proc + delta ) else: # Forecast-only mode — use BMI timestamp as-is - config_options.current_fcst_cycle = config_options.b_date_proc - config_options.current_time = pd.Timestamp( - config_options.b_date_proc + self._bmi._job_meta.current_fcst_cycle = self._bmi._job_meta.b_date_proc + self._bmi._job_meta.current_time = pd.Timestamp( + self._bmi._job_meta.b_date_proc ) + pd.to_timedelta(future_time, unit="s") - LOG.debug( - "NextGen Forcings Engine processing meteorological forcings for BMI timestamp" + self.log_debug( + msg="NextGen Forcings Engine processing meteorological forcings for BMI timestamp" ) - LOG.debug(f"Model.py current time: {config_options.current_time}") - LOG.debug(f"Model.py current fcst cycle: {config_options.current_fcst_cycle}") - - if config_options.first_fcst_cycle is None: - config_options.first_fcst_cycle = config_options.current_fcst_cycle - - return ( - future_time, - config_options, + self.log_debug(msg=f"Model.py current time: {self._bmi._job_meta.current_time}") + self.log_debug( + msg=f"Model.py current fcst cycle: {self._bmi._job_meta.current_fcst_cycle}" ) + if self._bmi._job_meta.first_fcst_cycle is None: + self._bmi._job_meta.first_fcst_cycle = ( + self._bmi._job_meta.current_fcst_cycle + ) + @time_function - def adjust_precip( - self, - config_options: ConfigOptions, - input_forcing_mod: dict, - mpi_config: MpiConfig, - ): + def set_skip_flags(self) -> None: """Adjust precipitation for the given forecast cycle.""" - if not config_options.precip_only_flag: + if not self._bmi._job_meta.precip_only_flag: # reset skips if present - for force_key in config_options.input_forcings: - input_forcing_mod[force_key].skip = False - - err_handler.check_program_status(config_options, mpi_config) - return ( - config_options, - input_forcing_mod, - mpi_config, - ) + for force_key in self._bmi._job_meta.input_forcings: + self._bmi._input_forcing_mod[force_key].skip = False + + self.check_program_status() @time_function - def log_forecast( - self, - config_options: ConfigOptions, - mpi_config: MpiConfig, - ): + def log_cycle(self) -> None: """Log information about the current forecast cycle.""" - # Log information about this forecast cycle - if mpi_config.rank == 0: - config_options.statusMsg = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" - err_handler.log_msg(config_options, mpi_config, True) - config_options.statusMsg = ( - "Processing Forecast Cycle: " - + config_options.current_fcst_cycle.strftime("%Y-%m-%d %H:%M") + if self._bmi._mpi_meta.rank == 0: + self.log_debug(msg="XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX") + self.log_debug( + msg=f"Processing Forecast Cycle: {self._bmi._job_meta.current_fcst_cycle.strftime('%Y-%m-%d %H:%M')}" ) - err_handler.log_msg(config_options, mpi_config, True) - config_options.statusMsg = ( - "Forecast Cycle Length is: " - + str(config_options.cycle_length_minutes) - + " minutes" + self.log_debug( + msg=f"Forecast Cycle Length is: {self._bmi._job_meta.cycle_length_minutes!s} minutes" ) - err_handler.log_msg(config_options, mpi_config, True) - # mpi_config.comm.barrier() - - return ( - config_options, - mpi_config, - ) + # self._bmi._mpi_meta.comm.barrier() @time_function def loop_through_forcing_products( - self, - future_time: float, - config_options: ConfigOptions, - wrf_hydro_geo_meta: GeoMeta, - input_forcing_mod: dict, - supp_pcp_mod: dict, - mpi_config: MpiConfig, - output_obj: OutputObj, - ): - """Loop through each forcing product and process it for the current forecast cycle.""" - # Loop through each output timestep. Perform the following functions: - # 1.) Calculate all necessary input files per user options. - # 2.) Read in input forcings from GRIB/NetCDF files. - # 3.) Regrid the forcings, and temporally interpolate. - # 4.) Downscale. - # 5.) Layer, and output as necessary. - ana_factor = 1 if config_options.ana_flag is False else 0 - show_message = True - if not config_options.precip_only_flag: - if config_options.grid_type == "gridded": + self, future_time: float + ) -> forcingInputMod.InputForcingsHydrofabric | None: + """Loop through each forcing product and process it for the current forecast cycle. + + Loop through each output timestep and perform the following steps: + + 1. Calculate all necessary input files per user options. + 2. Read input forcings from GRIB/NetCDF files. + 3. Regrid the forcings and perform temporal interpolation. + 4. Downscale. + 5. Layer and write output as necessary. + + :param float future_time: See description in ``self.run``. + :returns: Processed input forcings for the current timestep. + :rtype: forcingInputMod.InputForcings | None + """ + ana_factor = 1 if self._bmi._job_meta.ana_flag is False else 0 + if not self._bmi._job_meta.precip_only_flag: + if self._bmi._job_meta.grid_type == "gridded": # Reset out final grids to missing values. - output_obj.output_local[:, :, :] = config_options.globalNdv - elif config_options.grid_type == "unstructured": + self._bmi._output_obj.output_local[:, :, :] = ( + self._bmi._job_meta.globalNdv + ) + elif self._bmi._job_meta.grid_type == "unstructured": # Reset out final grids to missing values. - output_obj.output_local[:, :] = config_options.globalNdv - output_obj.output_local_elem[:, :] = config_options.globalNdv - elif config_options.grid_type == "hydrofabric": + self._bmi._output_obj.output_local[:, :] = self._bmi._job_meta.globalNdv + self._bmi._output_obj.output_local_elem[:, :] = ( + self._bmi._job_meta.globalNdv + ) + elif self._bmi._job_meta.grid_type == "hydrofabric": # Reset out final grids to missing values. - output_obj.output_local[:, :] = config_options.globalNdv + self._bmi._output_obj.output_local[:, :] = self._bmi._job_meta.globalNdv + else: + raise ValueError( + f"Unexpected grid_type: {repr(self._bmi._job_meta.grid_type)}" + ) # Increment or initialize output step count - if config_options.current_output_step is None: - config_options.current_output_step = 1 + if self._bmi._job_meta.current_output_step is None: + self._bmi._job_meta.current_output_step = 1 else: - config_options.current_output_step += 1 + self._bmi._job_meta.current_output_step += 1 # Optional sub-output timestamp - if config_options.sub_output_hour is not None: + if self._bmi._job_meta.sub_output_hour is not None: + raise NotImplementedError( + f"sub_output_hour (config SubOutputHour) is {repr(self._bmi._job_meta.sub_output_hour)} (not None) but is not used." + ) # TODO This is not used - subOutDate = config_options.first_fcst_cycle + datetime.timedelta( - hours=config_options.sub_output_hour + subOutDate = self._bmi._job_meta.first_fcst_cycle + datetime.timedelta( + hours=self._bmi._job_meta.sub_output_hour ) # Compute the output timestamp for this step - if config_options.ana_flag: - output_obj.outDate = ( - config_options.current_fcst_cycle - + datetime.timedelta(seconds=config_options.output_freq * 60) + if self._bmi._job_meta.ana_flag: + self._bmi._output_obj.outDate = ( + self._bmi._job_meta.current_fcst_cycle + + datetime.timedelta(seconds=self._bmi._job_meta.output_freq * 60) ) else: - output_obj.outDate = ( - config_options.current_fcst_cycle + self._bmi._output_obj.outDate = ( + self._bmi._job_meta.current_fcst_cycle + datetime.timedelta(seconds=future_time) ) - config_options.current_output_date = output_obj.outDate + self._bmi._job_meta.current_output_date = self._bmi._output_obj.outDate # Adjust file_date for AnA if needed file_date = ( - output_obj.outDate - - datetime.timedelta(seconds=config_options.output_freq * 60) - if config_options.ana_flag - else output_obj.outDate + self._bmi._output_obj.outDate + - datetime.timedelta(seconds=self._bmi._job_meta.output_freq * 60) + if self._bmi._job_meta.ana_flag + else self._bmi._output_obj.outDate ) # Compute previous output date (used for downscaling logic) - if config_options.current_output_step == ana_factor: - config_options.prev_output_date = config_options.current_output_date + if self._bmi._job_meta.current_output_step == ana_factor: + self._bmi._job_meta.prev_output_date = ( + self._bmi._job_meta.current_output_date + ) else: - config_options.prev_output_date = ( - config_options.current_output_date + self._bmi._job_meta.prev_output_date = ( + self._bmi._job_meta.current_output_date - datetime.timedelta(seconds=future_time) ) # Print message on log file indicating the timestamp # we are currently processing for forcings - if mpi_config.rank == 0 and show_message: - config_options.statusMsg = "=========================================" - err_handler.log_msg(config_options, mpi_config, True) - config_options.statusMsg = f"Processing for output timestep: {file_date.strftime('%Y-%m-%d %H:%M')}" - err_handler.log_msg(config_options, mpi_config, True) - - config_options.currentForceNum = 0 - config_options.currentCustomForceNum = 0 - LOG.debug(f"config_options.input_forcings: {config_options.input_forcings}") + if self._bmi._mpi_meta.rank == 0: + self.log_debug(msg="=========================================") + self.log_debug( + msg=f"Processing for output timestep: {file_date.strftime('%Y-%m-%d %H:%M')}" + ) + + self._bmi._job_meta.currentForceNum = 0 + self._bmi._job_meta.currentCustomForceNum = 0 + self.log_debug( + msg=f"config_options.input_forcings: {self._bmi._job_meta.input_forcings}" + ) # Loop over each of the input forcings specified. - LOG.debug( - f"Model.py forcing loop: {len(config_options.input_forcings)} forcings configured: {config_options.input_forcings}" + self.log_debug( + msg=f"Model.py forcing loop: {len(self._bmi._job_meta.input_forcings)} forcings configured: {self._bmi._job_meta.input_forcings}" ) - for force_key in config_options.input_forcings: - LOG.debug(f"force_key: {force_key}") - LOG.debug(f"config_options.aws: {config_options.aws}") + for force_key in self._bmi._job_meta.input_forcings: + self.log_debug(msg=f"force_key: {force_key}") + self.log_debug(msg=f"config_options.aws: {self._bmi._job_meta.aws}") # Pass these methods for AORC data is ERA5-Interim blend is requested # so we can finish filling in the missing gaps if ( force_key == 23 - and 12 in config_options.input_forcings - and 21 in config_options.input_forcings + and 12 in self._bmi._job_meta.input_forcings + and 21 in self._bmi._job_meta.input_forcings ): - input_forcings = input_forcing_mod[force_key] + input_forcings = self._bmi._input_forcing_mod[force_key] # These are not used # AORC_mask = input_forcings.regridded_mask_AORC # AORC_elem_mask = input_forcings.regridded_mask_elem_AORC else: - input_forcings = input_forcing_mod[force_key] + input_forcings = self._bmi._input_forcing_mod[force_key] input_forcings.calc_neighbor_files( - config_options, output_obj.outDate, mpi_config + self._bmi._job_meta, + self._bmi._output_obj.outDate, + self._bmi._mpi_meta, ) - if force_key in [12, 21, 27]: - if config_options.aws is None: - # Calculate the previous and next input cycle files from the inputs. - input_forcings.calc_neighbor_files( - config_options, output_obj.outDate, mpi_config - ) - err_handler.check_program_status(config_options, mpi_config) - else: - # Flag to indicate the AWS .zarr AORC method - if force_key == 12: - if self.source_data_processor is None: - self.source_data_processor = AORCConusProcessor( - config_options, mpi_config, wrf_hydro_geo_meta - ) - elif force_key == 21: - if self.source_data_processor is None: - self.source_data_processor = AORCAlaskaProcessor( - config_options, mpi_config, wrf_hydro_geo_meta - ) - - # Flag to indicate the AWS .zarr NWMv3 Forcing file method - elif force_key == 27: - if self.source_data_processor is None: - if config_options.nwm_domain == "CONUS": - self.source_data_processor = NWMV3ConusProcessor( - config_options, mpi_config, wrf_hydro_geo_meta - ) - elif config_options.nwm_domain in [ - "Hawaii", - "PR", - ]: - self.source_data_processor = NWMV3OConusProcessor( - config_options, mpi_config, wrf_hydro_geo_meta - ) - elif config_options.nwm_domain == "Alaska": - self.source_data_processor = NWMV3AlaskaProcessor( - config_options, mpi_config, wrf_hydro_geo_meta - ) - else: - raise ValueError( - f"Unsupported domain type ({config_options.nwm_domain} for forcing type: {force_key} )" - ) - - config_options.aws_obj = ( - self.source_data_processor.process_historical_data( - config_options.current_time - ) - ) + # Handle AORC and NWM force keys + self.__handle_aorc_and_nwm_force_keys(input_forcings, force_key) # If skipping this forcing, continue early + # NOTE this is used by the esmf regrid pytests, to halt the loop before "manually" calling a particular regrid function. if input_forcings.skip is True: - LOG.debug(f"Breaking loop for force_key {force_key}") + self.log_debug(msg=f"Breaking loop for force_key {force_key}") break + # Regrid forcings. input_forcings.regrid_inputs( - config_options, wrf_hydro_geo_meta, mpi_config + self._bmi._job_meta, self._bmi.geo_meta, self._bmi._mpi_meta ) - err_handler.check_program_status(config_options, mpi_config) + self.check_program_status() # Run check on regridded fields for reasonable values that are not missing values. err_handler.check_forcing_bounds( - config_options, input_forcings, mpi_config + self._bmi._job_meta, input_forcings, self._bmi._mpi_meta ) - err_handler.check_program_status(config_options, mpi_config) + self.check_program_status() # If we are restarting a forecast cycle, re-calculate the neighboring files, and regrid the # next set of forcings as the previous step just regridded the previous forcing. - if input_forcings.rstFlag == 1: - if ( - input_forcings.regridded_forcings1 is not None - and input_forcings.regridded_forcings2 is not None - ): - # Set the forcings back to reflect we just regridded the previous set of inputs, not the next. - if config_options.grid_type == "gridded": - input_forcings.regridded_forcings1[:, :, :] = ( - input_forcings.regridded_forcings2[:, :, :] - ) - elif config_options.grid_type == "unstructured": - input_forcings.regridded_forcings1[:, :] = ( - input_forcings.regridded_forcings2[:, :] - ) - input_forcings.regridded_forcings1_elem[:, :] = ( - input_forcings.regridded_forcings2_elem[:, :] - ) - elif config_options.grid_type == "hydrofabric": - input_forcings.regridded_forcings1[:, :] = ( - input_forcings.regridded_forcings2[:, :] - ) - # Re-calculate the neighbor files. - input_forcings.calc_neighbor_files( - config_options, output_obj.outDate, mpi_config - ) - err_handler.check_program_status(config_options, mpi_config) - - # Regrid the forcings for the end of the window. - input_forcings.regrid_inputs( - config_options, wrf_hydro_geo_meta, mpi_config - ) - err_handler.check_program_status(config_options, mpi_config) - - input_forcings.rstFlag = 0 + self.__use_rstFlag(input_forcings) # Run temporal interpolation on the grids. - input_forcings.temporal_interpolate_inputs(config_options, mpi_config) - err_handler.check_program_status(config_options, mpi_config) + input_forcings.temporal_interpolate_inputs( + self._bmi._job_meta, self._bmi._mpi_meta + ) + self.check_program_status() # Run bias correction. bias_correction.run_bias_correction( - input_forcings, config_options, wrf_hydro_geo_meta, mpi_config + input_forcings, + self._bmi._job_meta, + self._bmi.geo_meta, + self._bmi._mpi_meta, ) - err_handler.check_program_status(config_options, mpi_config) + self.check_program_status() # Run downscaling on grids for this output timestep. downscale.run_downscaling( - input_forcings, config_options, wrf_hydro_geo_meta, mpi_config + input_forcings, + self._bmi._job_meta, + self._bmi.geo_meta, + self._bmi._mpi_meta, ) - err_handler.check_program_status(config_options, mpi_config) + self.check_program_status() # Layer in forcings from this product. layeringMod.layer_final_forcings( - output_obj, input_forcings, config_options, mpi_config + self._bmi._output_obj, + input_forcings, + self._bmi._job_meta, + self._bmi._mpi_meta, ) - err_handler.check_program_status(config_options, mpi_config) + self.check_program_status() - config_options.currentForceNum += 1 + self._bmi._job_meta.currentForceNum += 1 + # NOTE currentCustomForceNum does not appear to be used. if force_key == 10: - config_options.currentCustomForceNum += 1 + self._bmi._job_meta.currentCustomForceNum += 1 - LOG.debug(f"End of loop for force_key {force_key}") + self.log_debug(msg=f"End of loop for force_key {force_key}") # Process supplemental precipitation if we specified in the configuration file. - if config_options.number_supp_pcp > 0: - for supp_pcp_key in config_options.supp_precip_forcings: + if self._bmi._job_meta.number_supp_pcp > 0: + for supp_pcp_key in self._bmi._job_meta.supp_precip_forcings: if supp_pcp_key != 13: - # Like with input forcings, calculate the neighboring files to use. - supp_pcp_mod[supp_pcp_key].calc_neighbor_files( - config_options, output_obj.outDate, mpi_config - ) - err_handler.check_program_status(config_options, mpi_config) - - # Regrid the supplemental precipitation. - supp_pcp_mod[supp_pcp_key].regrid_inputs( - config_options, wrf_hydro_geo_meta, mpi_config - ) - err_handler.check_program_status(config_options, mpi_config) - - if ( - supp_pcp_mod[supp_pcp_key].regridded_precip1 is not None - and supp_pcp_mod[supp_pcp_key].regridded_precip2 is not None - ): - # Run check on regridded fields for reasonable values that are not missing values. - err_handler.check_supp_pcp_bounds( - config_options, - supp_pcp_mod[supp_pcp_key], - mpi_config, - wrf_hydro_geo_meta, - ) - err_handler.check_program_status(config_options, mpi_config) - - # TODO input_forcings has not yet been initialized, so this is a bug waiting to happen - self.disaggregate_fun( - input_forcings, - supp_pcp_mod[supp_pcp_key], - config_options, - mpi_config, - ) - err_handler.check_program_status(config_options, mpi_config) - - # Run temporal interpolation on the grids. - supp_pcp_mod[supp_pcp_key].temporal_interpolate_inputs( - config_options, mpi_config - ) - err_handler.check_program_status(config_options, mpi_config) - - # Layer in the supplemental precipitation into the current output object. - layeringMod.layer_supplemental_forcing( - output_obj, - supp_pcp_mod[supp_pcp_key], - config_options, - mpi_config, - ) - err_handler.check_program_status(config_options, mpi_config) + # Below comment copied from earlier code, the comment had been just above the call to ``disaggregate_fun``. + # TODO input_forcings has not yet been initialized, so this is a bug waiting to happen + self.__process_supp_precip_key(input_forcings, supp_pcp_key) # Call the output routines # adjust date for AnA if necessary - if config_options.ana_flag: - output_obj.outDate = file_date + if self._bmi._job_meta.ana_flag: + self._bmi._output_obj.outDate = file_date ################ Commenting this out to bypass NWM forcing file output functionality ######### - # output_obj.output_final_ldasin(config_options, wrf_hydro_geo_meta, mpi_config) - # err_handler.check_program_status(config_options, mpi_config) + # self._bmi._output_obj.output_final_ldasin(self._bmi._job_meta, self._bmi.geo_meta, self._bmi._mpi_meta) + # self.check_program_status() ############################################################################################## + else: + input_forcings = None + + return input_forcings + + def __handle_aorc_and_nwm_force_keys( + self, input_forcings: forcingInputMod.InputForcings, force_key: int + ) -> None: + """During ``loop_through_forcing_products``, handle the case where the force key is AORC or NWM. - return ( - future_time, - config_options, - wrf_hydro_geo_meta, - input_forcing_mod, - supp_pcp_mod, - mpi_config, - output_obj, - input_forcings, + This code block was cut and pasted from the method ``loop_through_forcing_products`` during refactor. + + :param input_forcings forcingInputMod.InputForcings: Input forcings object to be modified. + :param int force_key: Identifier for the forcing type. + + :warning: Modifies mutable arguments in-place. + """ + proc_args = (self._bmi._job_meta, self._bmi._mpi_meta, self._bmi.geo_meta) + + if force_key in [12, 21, 27]: + if self._bmi._job_meta.aws is None: + # Calculate the previous and next input cycle files from the inputs. + input_forcings.calc_neighbor_files( + self._bmi._job_meta, + self._bmi._output_obj.outDate, + self._bmi._mpi_meta, + ) + self.check_program_status() + else: + if len(self._bmi._job_meta.input_forcings) != 1: + raise ValueError( + f"Expected to have 1 forcing key, but have {len(self._bmi._job_meta.input_forcings)}: {list(self._bmi._job_meta.input_forcings)}" + ) + if self.source_data_processor is None: + # Flag to indicate the AWS .zarr AORC method + if force_key == 12: + proc_cls = AORCConusProcessor + elif force_key == 21: + proc_cls = AORCAlaskaProcessor + # Flag to indicate the AWS .zarr NWMv3 Forcing file method + elif force_key == 27: + if self._bmi._job_meta.nwm_domain == "CONUS": + proc_cls = NWMV3ConusProcessor + elif self._bmi._job_meta.nwm_domain in [ + "Hawaii", + "PR", + ]: + proc_cls = NWMV3OConusProcessor + elif self._bmi._job_meta.nwm_domain == "Alaska": + proc_cls = NWMV3AlaskaProcessor + else: + raise ValueError( + f"Unsupported domain type ({self._bmi._job_meta.nwm_domain} for forcing type: {force_key} )" + ) + else: + raise ValueError(f"Unexpected force_key: {force_key}") + self.source_data_processor = proc_cls(*proc_args) + + self._bmi._job_meta.aws_obj = ( + self.source_data_processor.process_historical_data( + self._bmi._job_meta.current_time + ) + ) + + def __process_supp_precip_key( + self, input_forcings: forcingInputMod.InputForcings, supp_pcp_key: int + ) -> None: + """Process supplemental precipitation for a single supplemental precipitation key. + + This code block was cut and pasted from the methods + ``loop_through_forcing_products`` and ``process_suplemental_precip`` during refactor. + + :param input_forcings forcingInputMod.InputForcings: Input forcings object to be modified. + :param int supp_pcp_key: Identifier for the supplemental precipitation forcing. + + :warning: Modifies mutable arguments in-place. + """ + # Like with input forcings, calculate the neighboring files to use. + self._bmi._supp_pcp_mod[supp_pcp_key].calc_neighbor_files( + self._bmi._job_meta, + self._bmi._output_obj.outDate, + self._bmi._mpi_meta, ) + self.check_program_status() + + # Regrid the supplemental precipitation. + self._bmi._supp_pcp_mod[supp_pcp_key].regrid_inputs( + self._bmi._job_meta, self._bmi.geo_meta, self._bmi._mpi_meta + ) + self.check_program_status() + + if ( + self._bmi._supp_pcp_mod[supp_pcp_key].regridded_precip1 is not None + and self._bmi._supp_pcp_mod[supp_pcp_key].regridded_precip2 is not None + ): + # Run check on regridded fields for reasonable values that are not missing values. + err_handler.check_supp_pcp_bounds( + self._bmi._job_meta, + self._bmi._supp_pcp_mod[supp_pcp_key], + self._bmi._mpi_meta, + self._bmi.geo_meta, + ) + self.check_program_status() + + self.disaggregate_fun( + input_forcings, + self._bmi._supp_pcp_mod[supp_pcp_key], + self._bmi._job_meta, + self._bmi._mpi_meta, + ) + self.check_program_status() + + # Run temporal interpolation on the grids. + self._bmi._supp_pcp_mod[supp_pcp_key].temporal_interpolate_inputs( + self._bmi._job_meta, self._bmi._mpi_meta + ) + self.check_program_status() + + # Layer in the supplemental precipitation into the current output object. + layeringMod.layer_supplemental_forcing( + self._bmi._output_obj, + self._bmi._supp_pcp_mod[supp_pcp_key], + self._bmi._job_meta, + self._bmi._mpi_meta, + ) + self.check_program_status() + + def __use_rstFlag(self, input_forcings: forcingInputMod.InputForcings) -> None: + """If restarting a forecast cycle, re-calculate neighboring files and regrid the + next set of forcings, as the previous step regridded the prior forcing. + + This code block was cut and pasted from the method + ``loop_through_forcing_products`` during refactor. + + :param input_forcings forcingInputMod.InputForcings: Input forcings object to be modified. + + :warning: Modifies mutable arguments in-place. + """ + if input_forcings.rstFlag == 1: + if ( + input_forcings.regridded_forcings1 is not None + and input_forcings.regridded_forcings2 is not None + ): + # Set the forcings back to reflect we just regridded the previous set of inputs, not the next. + if self._bmi._job_meta.grid_type == "gridded": + input_forcings.regridded_forcings1[:, :, :] = ( + input_forcings.regridded_forcings2[:, :, :] + ) + elif self._bmi._job_meta.grid_type == "unstructured": + input_forcings.regridded_forcings1[:, :] = ( + input_forcings.regridded_forcings2[:, :] + ) + input_forcings.regridded_forcings1_elem[:, :] = ( + input_forcings.regridded_forcings2_elem[:, :] + ) + elif self._bmi._job_meta.grid_type == "hydrofabric": + input_forcings.regridded_forcings1[:, :] = ( + input_forcings.regridded_forcings2[:, :] + ) + else: + raise ValueError( + f"Unexpected grid_type: {repr(self._bmi._job_meta.grid_type)}" + ) + # Re-calculate the neighbor files. + input_forcings.calc_neighbor_files( + self._bmi._job_meta, + self._bmi._output_obj.outDate, + self._bmi._mpi_meta, + ) + self.check_program_status() + + # Regrid the forcings for the end of the window. + input_forcings.regrid_inputs( + self._bmi._job_meta, self._bmi.geo_meta, self._bmi._mpi_meta + ) + self.check_program_status() + + input_forcings.rstFlag = 0 @time_function def process_suplemental_precip( - self, - config_options: ConfigOptions, - wrf_hydro_geo_meta: GeoMeta, - supp_pcp_mod: dict, - mpi_config: MpiConfig, - output_obj: OutputObj, - input_forcings: dict, - ): - """Process supplemental precipitation for the current forecast cycle.""" - if config_options.customSuppPcpFreq is not None: - # Process supplemental precipitation if we specified in the configuration file. - if config_options.number_supp_pcp > 0: - for supp_pcp_key in config_options.supp_precip_forcings: - if supp_pcp_key == 14: - # Like with input forcings, calculate the neighboring files to use. - supp_pcp_mod[supp_pcp_key].calc_neighbor_files( - config_options, output_obj.outDate, mpi_config - ) - err_handler.check_program_status(config_options, mpi_config) - - # Regrid the supplemental precipitation. - supp_pcp_mod[supp_pcp_key].regrid_inputs( - config_options, wrf_hydro_geo_meta, mpi_config - ) - err_handler.check_program_status(config_options, mpi_config) - - if ( - supp_pcp_mod[supp_pcp_key].regridded_precip1 is not None - and supp_pcp_mod[supp_pcp_key].regridded_precip2 is not None - ): - # Run check on regridded fields for reasonable values that are not missing values. - err_handler.check_supp_pcp_bounds( - config_options, - supp_pcp_mod[supp_pcp_key], - mpi_config, - wrf_hydro_geo_meta, - ) - err_handler.check_program_status(config_options, mpi_config) + self, input_forcings: forcingInputMod.InputForcings + ) -> None: + """Process supplemental precipitation for the current forecast cycle. - self.disaggregate_fun( - input_forcings, - supp_pcp_mod[supp_pcp_key], - config_options, - mpi_config, - ) - err_handler.check_program_status(config_options, mpi_config) + :param input_forcings forcingInputMod.InputForcings: Input forcings object to be modified. - # Run temporal interpolation on the grids. - supp_pcp_mod[supp_pcp_key].temporal_interpolate_inputs( - config_options, mpi_config - ) - err_handler.check_program_status(config_options, mpi_config) - - # Layer in the supplemental precipitation into the current output object. - layeringMod.layer_supplemental_forcing( - output_obj, - supp_pcp_mod[supp_pcp_key], - config_options, - mpi_config, - ) - err_handler.check_program_status(config_options, mpi_config) - - return ( - config_options, - wrf_hydro_geo_meta, - supp_pcp_mod, - mpi_config, - output_obj, - ) + :warning: Modifies mutable arguments in-place. + """ + if self._bmi._job_meta.customSuppPcpFreq is not None: + # Process supplemental precipitation if we specified in the configuration file. + if self._bmi._job_meta.number_supp_pcp > 0: + for supp_pcp_key in self._bmi._job_meta.supp_precip_forcings: + if supp_pcp_key == 14: + self.__process_supp_precip_key(input_forcings, supp_pcp_key) @time_function - def write_output( - self, - config_options: ConfigOptions, - wrf_hydro_geo_meta: GeoMeta, - mpi_config: MpiConfig, - output_obj: OutputObj, - ): - """Write the output for the current forecast cycle.""" - # If user requests output for given domain, then call - # the I/O module to update opened netcdf file with forcing fields + def write_output(self) -> None: + """Write the output for the current forecast cycle. + + If user requests output for given domain, then call + the I/O module to update opened netcdf file with forcing fields. + """ if ( - config_options.forcing_output == 1 - or config_options.grid_type == "hydrofabric" + self._bmi._job_meta.forcing_output == 1 + or self._bmi._job_meta.grid_type == "hydrofabric" ): - output_obj.gather_global_outputs( - config_options, wrf_hydro_geo_meta, mpi_config + self._bmi._output_obj.gather_global_outputs( + self._bmi._job_meta, self._bmi.geo_meta, self._bmi._mpi_meta ) - return ( - config_options, - wrf_hydro_geo_meta, - mpi_config, - output_obj, - ) - - """##################Step 6: flatten and update dict##########################################################################""" @time_function - def update_dict( - self, - model: dict, - config_options: ConfigOptions, - wrf_hydro_geo_meta: GeoMeta, - output_obj: OutputObj, - ): - """Flatten the Forcings Engine output object and update the BMI dictionary.""" - # Now loop through Forcings Engine output object - # and flatten the 2D forcing array and append to - # the BMI object to advertise to BMIinterface - # 0.) U-Wind (m/s) - # 1.) V-Wind (m/s) - # 2.) Surface incoming longwave radiation flux (W/m^2) - # 3.) Precipitation rate (mm/s) - # 4.) 2-meter temperature (K) - # 5.) 2-meter specific humidity (kg/kg) - # 6.) Surface pressure (Pa) - # 7.) Surface incoming shortwave radiation flux (W/m^2) - # 8.) Liquid Precipitation Fraction (%), Only available in certain operational configurations - - if config_options.include_lqfrac == 1: - variables = [ - "U2D", - "V2D", - "LWDOWN", - "RAINRATE", - "T2D", - "Q2D", - "PSFC", - "SWDOWN", - "LQFRAC", - ] - else: - variables = [ - "U2D", - "V2D", - "LWDOWN", - "RAINRATE", - "T2D", - "Q2D", - "PSFC", - "SWDOWN", - ] - if config_options.grid_type == "gridded": + def update_bmi_output_dict(self) -> None: + """Flatten the Forcings Engine output object and update the BMI dictionary. + + Loop through the Forcings Engine output object, flatten the 2D forcing arrays, + and append them to the BMI object for advertisement through the BMI interface. + + The flattened variables are ordered as follows: + + 0. U-wind (m/s) + 1. V-wind (m/s) + 2. Surface incoming longwave radiation flux (W/m²) + 3. Precipitation rate (mm/s) + 4. 2-meter air temperature (K) + 5. 2-meter specific humidity (kg/kg) + 6. Surface pressure (Pa) + 7. Surface incoming shortwave radiation flux (W/m²) + 8. Liquid precipitation fraction (%), available only in certain operational configurations + """ + variables = copy.deepcopy(model_consts["update_dict_base_vars"]) + if self._bmi._job_meta.include_lqfrac == 1: + variables.append(model_consts["update_dict_var_include_lqfraq"]) + + if self._bmi._job_meta.grid_type == "gridded": for count, variable in enumerate(variables): - model[variable + "_ELEMENT"] = output_obj.output_local[ - count, :, : - ].flatten() - elif config_options.grid_type == "unstructured": + self._bmi._values[f"{variable}_ELEMENT"] = ( + self._bmi._output_obj.output_local[count, :, :].flatten() + ) + elif self._bmi._job_meta.grid_type == "unstructured": for count, variable in enumerate(variables): - model[variable + "_ELEMENT"] = output_obj.output_local_elem[ - count, : - ].flatten() - model[variable + "_NODE"] = output_obj.output_local[count, :].flatten() - elif config_options.grid_type == "hydrofabric": + self._bmi._values[f"{variable}_ELEMENT"] = ( + self._bmi._output_obj.output_local_elem[count, :].flatten() + ) + self._bmi._values[f"{variable}_NODE"] = ( + self._bmi._output_obj.output_local[count, :].flatten() + ) + elif self._bmi._job_meta.grid_type == "hydrofabric": for count, variable in enumerate(variables): - model[variable + "_ELEMENT"] = output_obj.output_global[ - count, : - ].flatten() - model["CAT-ID"] = wrf_hydro_geo_meta.element_ids_global - - return ( - model, - config_options, - wrf_hydro_geo_meta, - output_obj, - ) + self._bmi._values[f"{variable}_ELEMENT"] = ( + self._bmi._output_obj.output_global[count, :].flatten() + ) + self._bmi._values["CAT-ID"] = self._bmi.geo_meta.element_ids_global + else: + raise ValueError( + f"Unexpected grid_type: {repr(self._bmi._job_meta.grid_type)}" + ) diff --git a/tests/test_utils.py b/tests/test_utils.py index 513f4a76..16ce4458 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -527,64 +527,20 @@ def pre_regrid(self) -> None: f"In pre_regrid, expected state to be either None or 'post_ran' but got {repr(self._state)}. The test is set up incorrectly." ) - config_options = self.config_options - mpi_config = self.mpi_config - geo_meta = self.geo_meta - supp_pcp_mod = self.bmi_model._supp_pcp_mod - output_obj = self.bmi_model._output_obj - input_forcing_mod = self.bmi_model._input_forcing_mod - future_time = ( self.bmi_model._values["current_model_time"] + self.bmi_model._values["time_step_size"] ) model = self.bmi_model._model - ### NOTE with the exception of setting the skip flag, the below - ### block is copied verbatim from NWMv3ForcingEngineModel.run() - ( - future_time, - config_options, - ) = model.determine_forecast( - future_time, - config_options, - ) - ( - config_options, - input_forcing_mod, - mpi_config, - ) = model.adjust_precip( - config_options, - input_forcing_mod, - mpi_config, - ) - ( - config_options, - mpi_config, - ) = model.log_forecast( - config_options, - mpi_config, - ) + # NOTE this should mimic NWMv3ForcingEngineModel.run() + # with the exception of externally setting the skip flags within this class. + model.set_cycle_timing_attrs(future_time) + model.set_skip_flags() + model.log_cycle() ### NOTE setting the flag causes the regrid step to be skipped self.set_input_forcings_skip_flags() - ( - future_time, - config_options, - geo_meta, - input_forcing_mod, - supp_pcp_mod, - mpi_config, - output_obj, - input_forcings, - ) = model.loop_through_forcing_products( - future_time, - config_options, - geo_meta, - input_forcing_mod, - supp_pcp_mod, - mpi_config, - output_obj, - ) + model.loop_through_forcing_products(future_time) # Update test fixture status self._state = "pre_ran" @@ -592,7 +548,7 @@ def pre_regrid(self) -> None: def set_input_forcings_skip_flags(self) -> None: """Set the `skip` flag on the InputForcings object so that forcing regrid will not occur during loop_through_forcing_products().""" logging.debug( - "Setting input_forcing.skip = True for each value in dict self.input_forcing_mod" + "Setting input_forcing.skip = True for each value in dict self.bmi_model._input_forcing_mod" ) for force_key, input_forcing in self.bmi_model._input_forcing_mod.items(): input_forcing.skip = True