diff --git a/R/convert_vera4cast_inflow.R b/R/convert_vera4cast_inflow.R index 88ed6a4..884d268 100644 --- a/R/convert_vera4cast_inflow.R +++ b/R/convert_vera4cast_inflow.R @@ -1,4 +1,4 @@ -convert_vera4cast_inflow <- function(reference_date, model_id, save_path){ +convert_vera4cast_inflow <- function(reference_date, model_id, save_path, config = NULL){ variables <- c("TP_ugL_sample", "NH4_ugL_sample","NO3NO2_ugL_sample", "SRP_ugL_sample","DOC_mgL_sample","DRSI_mgL_sample", @@ -9,9 +9,14 @@ forecast_df <- NULL for(i in 1:length(variables)){ - s3 <- arrow::s3_bucket(bucket = glue::glue("bio230121-bucket01/vera4cast/forecasts/archive-parquet/project_id=vera4cast/duration=P1D/variable={variables[i]}/model_id={model_id}/reference_date={reference_date}"), - endpoint_override = "https://amnh1.osn.mghpcc.org", - anonymous = TRUE) + if(!is.null(config)){ + faasr_prefix <- glue::glue("vera4cast/forecasts/archive-parquet/project_id=vera4cast/duration=P1D/variable={variables[i]}/model_id={model_id}/reference_date={reference_date}") + s3 <- FLAREr::flare_arrow_s3_bucket(server_name = "vera4cast_forecasts", faasr_prefix = faasr_prefix, config = config) + } else { + s3 <- arrow::s3_bucket(bucket = glue::glue("bio230121-bucket01/vera4cast/forecasts/archive-parquet/project_id=vera4cast/duration=P1D/variable={variables[i]}/model_id={model_id}/reference_date={reference_date}"), + endpoint_override = "https://amnh1.osn.mghpcc.org", + anonymous = TRUE) + } ## test to see if inflow forecast exists ## tryCatch({ @@ -28,11 +33,6 @@ for(i in 1:length(variables)){ }) - # df <- arrow::open_dataset(s3) |> - # dplyr::filter(site_id == "tubr") |> - # dplyr::collect() |> - # dplyr::mutate(variable = variables[i]) - forecast_df <- dplyr::bind_rows(forecast_df, df) } diff --git a/R/generate_forecast_score_arrow.R b/R/generate_forecast_score_arrow.R index f51cd38..e05a020 100644 --- a/R/generate_forecast_score_arrow.R +++ b/R/generate_forecast_score_arrow.R @@ -12,10 +12,17 @@ generate_forecast_score_arrow <- function(targets_df, bucket = NULL, endpoint = NULL, local_directory = NULL, - variable_types = "state"){ + variable_types = "state", + config = NULL){ - if(use_s3){ + if(!is.null(config)){ + vars <- arrow_env_vars() + output_directory <- FLAREr::flare_arrow_s3_bucket(server_name = "scores", + faasr_prefix = "flare/scores/parquet", + config = config) + on.exit(unset_arrow_vars(vars)) + }else if(use_s3){ if(is.null(bucket) | is.null(endpoint)){ stop("scoring function needs bucket and endpoint if use_s3=TRUE") } diff --git a/configuration/glm_aed_flare_rs/configure_flare_glm_aed.yml b/configuration/glm_aed_flare_rs/configure_flare_glm_aed.yml index f35c6a4..8c0752c 100644 --- a/configuration/glm_aed_flare_rs/configure_flare_glm_aed.yml +++ b/configuration/glm_aed_flare_rs/configure_flare_glm_aed.yml @@ -20,6 +20,14 @@ s3: scores: endpoint: amnh1.osn.mghpcc.org bucket: bio230121-bucket01/flare/scores/parquet + vera4cast_targets: + endpoint: amnh1.osn.mghpcc.org + bucket: bio230121-bucket01/vera4cast/targets + anonymous: true + vera4cast_forecasts: + endpoint: amnh1.osn.mghpcc.org + bucket: bio230121-bucket01/vera4cast/forecasts/archive-parquet + anonymous: true location: site_id: fcre name: Falling Creek Reservoir @@ -28,18 +36,21 @@ location: da_setup: da_method: enkf par_fit_method: perturb - ensemble_size: 221 + ensemble_size: 31 localization_distance: .na #distance in meters were covariances in the model error are used no_negative_states: TRUE assimilate_first_step: FALSE use_obs_constraint: TRUE - obs_filename: fcre-targets-rs.csv + inflation_factor: 1.0 + add_random_noise: 2 + obs_filename: fcre-targets-insitu.csv model_settings: ncore: 4 model_name: glm_aed base_GLM_nml: glm3.nml base_AED_nml: aed2.nml base_AED_phyto_pars_nml: aed_phyto_pars.csv + base_AED_phyto_pars_nml_file: aed2.nml base_AED_zoop_pars_nml: aed2_zoop_pars.nml max_model_layers: 100 modeled_depths: [0.00,1.00,1.60,2.00,3.00,4.00,5.00,6.00,7.00,8.00,9.00] @@ -47,6 +58,8 @@ model_settings: obs_config_file: observations_config_aed.csv states_config_file: states_config_aed.csv depth_model_sd_config_file: depth_model_sd_aed.csv + non_vertical_noise_config_file: non_vertical_noise_config.csv + depth_sd: 0.0 default_init: lake_depth: 9.4 #not a modeled state temp: [25.667, 24.9101, 23.067, 21.8815, 19.6658, 16.5739, 12.9292, 12.8456, 12.8127, 12.8079, 12.778] @@ -87,6 +100,7 @@ output_settings: diagnostics_names: [extc] generate_plots: FALSE evaluate_past: FALSE + restart_save_timesteps: "all" diagnostics_daily: names: ['CAR_ch4_atm','CAR_atm_co2_flux', 'temp', 'OXY_oxy', 'temp', 'temp'] save_names: ['ch4_flux_mean', 'co2_flux_mean', 'temp_1.6m_mean', 'oxy_mean', 'temp_1.0m_mean', 'temp_8.0m_mean'] diff --git a/configuration/glm_aed_flare_rs/configure_run.yml b/configuration/glm_aed_flare_rs/configure_run.yml index 34ea50e..be8c9cd 100644 --- a/configuration/glm_aed_flare_rs/configure_run.yml +++ b/configuration/glm_aed_flare_rs/configure_run.yml @@ -1,9 +1,10 @@ restart_file: .na -start_datetime: 2024-10-01 00:00:00 +start_datetime: 2026-04-30 00:00:00 end_datetime: .na -forecast_start_datetime: 2024-12-20 00:00:00 -forecast_horizon: 34 -sim_name: glm_aed_flare_rs +forecast_start_datetime: 2026-05-04 00:00:00 +forecast_horizon: 1 +sim_name: glm_aed_flare_rs_demo_v4 configure_flare: configure_flare_glm_aed.yml configure_obs: observation_processing.yml use_s3: TRUE +use_faasr: TRUE diff --git a/configuration/glm_aed_flare_rs/glm3.nml b/configuration/glm_aed_flare_rs/glm3.nml index 41a188b..b9ba009 100755 --- a/configuration/glm_aed_flare_rs/glm3.nml +++ b/configuration/glm_aed_flare_rs/glm3.nml @@ -57,6 +57,8 @@ out_fn = "output" nsave = 1 csv_lake_fname = 'lake' + restart_fname = 'glm_restart.nc' + restart_nsave = 1 / &init_profiles num_heights = 28 @@ -73,6 +75,8 @@ avg_surf_temp = 6 restart_variables = 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 restart_mixer_count = 0 + init_restart_fname = 'glm_restart.nc' + init_restart_from_file = 0 / &meteorology met_sw = .true. diff --git a/configuration/glm_aed_flare_rs/non_vertical_noise_config.csv b/configuration/glm_aed_flare_rs/non_vertical_noise_config.csv new file mode 100644 index 0000000..0f83763 --- /dev/null +++ b/configuration/glm_aed_flare_rs/non_vertical_noise_config.csv @@ -0,0 +1,4 @@ +model_variable,model_source,model_depth_m,process_noise_sd +lake_depth,state,NA,0.01 +extc,diagnostic,1.0,NA +CAR_ch4_atm,diagnostic_daily,NA,0.0 \ No newline at end of file diff --git a/configuration/glm_aed_flare_rs/observations_config_aed.csv b/configuration/glm_aed_flare_rs/observations_config_aed.csv index 2a7d61c..7e2f7dd 100644 --- a/configuration/glm_aed_flare_rs/observations_config_aed.csv +++ b/configuration/glm_aed_flare_rs/observations_config_aed.csv @@ -1,7 +1,6 @@ -state_names_obs,obs_units,obs_sd,target_variable, multi_depth -temp,degC,1,Temp_C_mean,1 -OXY_oxy,mmol m-3,15,DO_mgL_mean,1 -OGM_doc_total,mmol m-3,18.7,fDOM_QSU_mean,1 -PHY_TCHLA,ugL,2,Chla_ugL_mean,1 -secchi, unitless,0.25, Secchi_m_sample,0 - +state_names_obs,obs_units,obs_sd,target_variable,multi_depth,model_source,model_variable,model_depth_m +temp,degC,1,Temp_C_mean,1,NA,NA,NA +OXY_oxy,mmol m-3,15,DO_mgL_mean,1,NA,NA,NA +OGM_doc_total,mmol m-3,18.7,fDOM_QSU_mean,1,NA,NA,NA +PHY_TCHLA,ugL,2,Chla_ugL_mean,1,NA,NA,NA +secchi,unitless,0.25,Secchi_m_sample,0,diagnostic,extc_coeff,1.0 diff --git a/workflows/glm_aed_flare_rs/generate_inflow_forecast.R b/workflows/glm_aed_flare_rs/generate_inflow_forecast.R index e853ea2..42084f1 100644 --- a/workflows/glm_aed_flare_rs/generate_inflow_forecast.R +++ b/workflows/glm_aed_flare_rs/generate_inflow_forecast.R @@ -1,14 +1,27 @@ library(tidyverse) -lake_directory <- here::here() -config_set_name <- "glm_aed_flare_v3" -configure_run_file <- "configure_run.yml" -config <- FLAREr::set_up_simulation(configure_run_file,lake_directory, config_set_name = config_set_name) +if (!exists("lake_directory", inherits = FALSE)) lake_directory <- here::here() +if (!exists("config_set_name", inherits = FALSE)) config_set_name <- "glm_aed_flare_v3" +if (!exists("configure_run_file", inherits = FALSE)) configure_run_file <- "configure_run.yml" +if (!exists("config", inherits = FALSE)) { + config <- FLAREr::set_up_simulation(configure_run_file, lake_directory, config_set_name = config_set_name) +} print('read VERA targets...') -targets_vera <- readr::read_csv("https://amnh1.osn.mghpcc.org/bio230121-bucket01/vera4cast/targets/project_id=vera4cast/duration=P1D/daily-inflow-targets.csv.gz", - show_col_types = FALSE) +if(!is.null(config$s3$vera4cast_targets)){ + targets_local <- file.path(tempdir(), "daily-inflow-targets.csv.gz") + FLAREr::flare_get_file(local_file = "daily-inflow-targets.csv.gz", + remote_file = "daily-inflow-targets.csv.gz", + server_name = "vera4cast_targets", + local_folder = tempdir(), + remote_folder = "vera4cast/targets/project_id=vera4cast/duration=P1D", + config = config) + targets_vera <- readr::read_csv(targets_local, show_col_types = FALSE) +} else { + targets_vera <- readr::read_csv("https://amnh1.osn.mghpcc.org/bio230121-bucket01/vera4cast/targets/project_id=vera4cast/duration=P1D/daily-inflow-targets.csv.gz", + show_col_types = FALSE) +} inflow_hist_dates <- tibble(datetime = seq(min(targets_vera$datetime), max(targets_vera$datetime), by = "1 day")) diff --git a/workflows/glm_aed_flare_rs/run_fcre_aed_forecast.R b/workflows/glm_aed_flare_rs/run_fcre_aed_forecast.R new file mode 100644 index 0000000..026a947 --- /dev/null +++ b/workflows/glm_aed_flare_rs/run_fcre_aed_forecast.R @@ -0,0 +1,372 @@ +run_fcre_aed_forecast <- function(config_set_name = "glm_aed_flare_rs", + configure_run_file = "configure_run.yml") { + + library(tidyverse) + library(lubridate) + set.seed(100) + + # Resolve GLM_PATH: respect any pre-set value, else use a container- + # embedded binary if available, else fall back to GLM3r. + glm_path_initial <- Sys.getenv("GLM_PATH") + message("GLM_PATH at function entry: '", glm_path_initial, "'") + if (glm_path_initial == "" || glm_path_initial == "GLM3r") { + if (file.exists("/opt/glm/glm")) { + Sys.setenv("GLM_PATH" = "/opt/glm/glm") + message("GLM_PATH resolved to container binary: /opt/glm/glm") + } else if (glm_path_initial == "") { + Sys.setenv("GLM_PATH" = "GLM3r") + message("GLM_PATH resolved to GLM3r (no container binary found)") + } + } + options(future.globals.maxSize = 891289600) + + Sys.setenv("AWS_DEFAULT_REGION" = "amnh1", + "AWS_S3_ENDPOINT" = "osn.mghpcc.org", + "USE_HTTPS" = TRUE) + + marker <- file.path("configuration", config_set_name, configure_run_file) + candidates <- list.files("/tmp/functions", + pattern = "configure_run\\.yml$", + recursive = TRUE, full.names = TRUE) + candidates <- candidates[grepl(marker, candidates, fixed = TRUE)] + lake_directory <- if (length(candidates) > 0) { + sub(paste0("/", marker, "$"), "", candidates[1]) + } else { + here::here() + } + setwd(lake_directory) + + source(file.path(lake_directory, "R/convert_vera4cast_inflow.R"), local = TRUE) + source(file.path(lake_directory, "R/generate_forecast_score_arrow.R"), local = TRUE) + + config <- FLAREr::set_up_simulation(configure_run_file = configure_run_file, + lake_directory = lake_directory, + config_set_name = config_set_name) + + # Wrap check_noaa_present: its internal handler can re-signal an arrow + # IOError when the NOAA partition is missing, which would abort the run. + noaa_ready <- tryCatch( + FLAREr::check_noaa_present(lake_directory, + configure_run_file, + config_set_name = config_set_name), + error = function(e) { + message("check_noaa_present errored (treating as not-ready): ", + conditionMessage(e)) + FALSE + } + ) + + reference_date <- lubridate::as_date(config$run_config$forecast_start_datetime) + inflow_prefix <- "vera4cast/forecasts/archive-parquet/project_id=vera4cast/duration=P1D/variable=Temp_C_mean/model_id=inflow_gefsClimAED" + s3 <- FLAREr::flare_arrow_s3_bucket(server_name = "vera4cast_forecasts", + faasr_prefix = inflow_prefix, + config = config) + avail_dates <- gsub("reference_date=", "", s3$ls()) + inflow_ready <- reference_date %in% lubridate::as_date(avail_dates) + + message(paste0("noaa ready: ", noaa_ready)) + message(paste0("inflow ready: ", inflow_ready)) + + loop_start <- Sys.time() + loop_budget_seconds <- 5.5 * 60 * 60 + + while (noaa_ready & inflow_ready) { + + if (as.numeric(Sys.time() - loop_start, units = "secs") > loop_budget_seconds) { + message("Approaching action time budget; exiting loop cleanly with restart written.") + break + } + + source(file.path(lake_directory, "workflows", config_set_name, "generate_inflow_forecast.R"), local = TRUE) + + insitu_local <- file.path(tempdir(), "daily-insitu-targets.csv.gz") + FLAREr::flare_get_file(local_file = "daily-insitu-targets.csv.gz", + remote_file = "daily-insitu-targets.csv.gz", + server_name = "vera4cast_targets", + local_folder = tempdir(), + remote_folder = "vera4cast/targets/project_id=vera4cast/duration=P1D", + config = config) + + readr::read_csv(insitu_local, show_col_types = FALSE) |> + dplyr::mutate(observation = ifelse(variable == "DO_mgL_mean", observation*1000*(1/32), observation), + observation = ifelse(variable == "fDOM_QSU_mean", -151.3407 + observation*29.62654, observation), + depth_m = ifelse(depth_m == 0.1, 0.0, depth_m)) |> + dplyr::rename(depth = depth_m) |> + dplyr::filter(site_id == "fcre", + datetime >= as_datetime(config$run_config$start_datetime)) |> + dplyr::mutate(datetime = lubridate::as_datetime(datetime)) |> + readr::write_csv(file.path(config$file_path$qaqc_data_directory, + paste0(config$location$site_id, "-targets-insitu.csv"))) + + source(file.path(lake_directory, "workflows", config_set_name, "getLST.R"), local = TRUE) + start_iso <- format(lubridate::as_datetime(config$run_config$start_datetime), "%Y-%m-%dT%H:%M:%SZ", tz = "UTC") + end_iso <- format(lubridate::as_datetime(config$run_config$forecast_start_datetime), "%Y-%m-%dT%H:%M:%SZ", tz = "UTC") + data <- tryCatch( + get_lst(bbox, start_iso, end_iso), + error = function(e) { message("get_lst failed: ", conditionMessage(e)); NULL } + ) + vals <- if (!is.null(data)) { + tryCatch(get_vals(points, data), error = function(e) NULL) + } else NULL + if (!is.null(vals)) { + clean_data(vals) |> + readr::write_csv(file.path(config$file_path$qaqc_data_directory, + paste0(config$location$site_id, "-targets-rs.csv"))) + } else { + message("No new RS data") + data <- as.data.frame(cbind("datetime" = paste0(substr(as.character(config$run_config$start_datetime), 0, 10), "T00:00:00Z"), + "observation" = NA, + "site_id" = "fcre", + "depth" = 0, + "variable" = "temperature")) + data |> + readr::write_csv(file.path(config$file_path$qaqc_data_directory, + paste0(config$location$site_id, "-targets-rs.csv"))) + } + + flare_result <- FLAREr::run_flare(lake_directory = lake_directory, + configure_run_file = configure_run_file, + config_set_name = config_set_name) + + ref_date <- as.character(lubridate::as_date(config$run_config$forecast_start_datetime)) + forecasts_s3 <- FLAREr::flare_arrow_s3_bucket( + server_name = "forecasts_parquet", + faasr_prefix = paste0("flare/forecasts/parquet", + "/site_id=", config$location$site_id, + "/model_id=", config$run_config$sim_name, + "/reference_date=", ref_date), + local_path = file.path(lake_directory, "forecasts", "parquet", + paste0("site_id=", config$location$site_id), + paste0("model_id=", config$run_config$sim_name), + paste0("reference_date=", ref_date)), + config = config + ) + forecast_df <- arrow::open_dataset(forecasts_s3) |> + collect() |> + mutate(datetime = lubridate::as_datetime(datetime), + site_id = config$location$site_id, + model_id = config$run_config$sim_name, + reference_date = ref_date) + + vera_variables <- c("Temp_C_mean","Chla_ugL_mean", "DO_mgL_mean", "fDOM_QSU_mean", "NH4_ugL_sample", + "NO3NO2_ugL_sample", "SRP_ugL_sample", "DIC_mgL_sample","Secchi_m_sample", + "Bloom_binary_mean","CH4_umolL_sample","IceCover_binary_max", "CO2flux_umolm2s_mean", "CH4flux_umolm2s_mean", + "Mixed_binary_mean") + + bloom_binary <- forecast_df |> + dplyr::filter(depth == 1.6 & variable == "Chla_ugL_mean") |> + dplyr::mutate(over = ifelse(prediction > 20, 1, 0)) |> + dplyr::summarize(prediction = sum(over) / n(), .by = c(datetime, reference_datetime, model_id, site_id, depth, variable)) |> + dplyr::mutate(family = "bernoulli", + parameter = "prob", + variable = "Bloom_binary_mean", + datetime = lubridate::as_datetime(datetime)) |> + dplyr::rename(depth_m = depth) |> + dplyr::select(reference_datetime, datetime, model_id, site_id, depth_m, family, parameter, variable, prediction) + + ice_binary <- forecast_df |> + dplyr::filter(variable == "ice_thickness") |> + dplyr::mutate(over = ifelse(prediction > 0, 1, 0)) |> + dplyr::summarize(prediction = sum(over) / n(), .by = c(datetime, reference_datetime, model_id, site_id, depth, variable)) |> + dplyr::mutate(family = "bernoulli", + parameter = "prob", + variable = "IceCover_binary_max", + depth = NA, + datetime = lubridate::as_datetime(datetime)) |> + dplyr::rename(depth_m = depth) |> + dplyr::select(reference_datetime, datetime, model_id, site_id, depth_m, family, parameter, variable, prediction) + + min_depth <- 1 + max_depth <- 8 + threshold <- 0.1 + + temp_forecast <- forecast_df |> + filter(variable %in% c("temp_1.0m_mean","temp_8.0m_mean")) |> + mutate(depth = ifelse(variable == "temp_1.0m_mean", 1.0, 8.0), + variable = "Temp_C_mean", + datetime = lubridate::as_datetime(datetime - lubridate::days(1))) |> + pivot_wider(names_from = depth, names_prefix = "wtr_", values_from = prediction) + + colnames(temp_forecast)[which(colnames(temp_forecast) == paste0("wtr_", min_depth))] <- "min_depth" + colnames(temp_forecast)[which(colnames(temp_forecast) == paste0("wtr_", max_depth))] <- "max_depth" + + mix_binary <- temp_forecast |> + mutate(min_depth = rLakeAnalyzer::water.density(min_depth), + max_depth = rLakeAnalyzer::water.density(max_depth), + mixed = ifelse((max_depth - min_depth) < threshold, 1, 0)) |> + summarise(prediction = (sum(mixed)/n()), .by = c(datetime, reference_datetime, model_id, site_id, variable)) |> + dplyr::mutate(family = "bernoulli", + parameter = "prob", + variable = "Mixed_binary_mean", + depth = NA, + datetime = lubridate::as_datetime(datetime)) |> + dplyr::rename(depth_m = depth) |> + dplyr::select(reference_datetime, datetime, model_id, site_id, depth_m, family, parameter, variable, prediction) + + vera4cast_df <- forecast_df |> + dplyr::rename(depth_m = depth) |> + dplyr::mutate(variable = ifelse(variable == "oxy_mean", "DO_mgL_mean", variable), + depth_m = ifelse(variable == "DO_mgL_mean", 1.6, depth_m), + datetime = ifelse(variable == "DO_mgL_mean", datetime - lubridate::days(1), datetime), + prediction = ifelse(variable == "DO_mgL_mean", prediction/1000*(32), prediction), + variable = ifelse(variable == "Temp_C_mean", "Temp_C_mean_all_depth", variable), + variable = ifelse(variable == "temp_1.6m_mean", "Temp_C_mean", variable), + depth_m = ifelse(variable == "Temp_C_mean", 1.6, depth_m), + datetime = ifelse(variable == "Temp_C_mean", datetime - lubridate::days(1), datetime), + prediction = ifelse(variable == "fDOM_QSU_mean", (151.3407 + prediction)/29.62654, prediction), + prediction = ifelse(variable == "NIT_amm", prediction/1000/0.001/(1/18.04), prediction), + variable = ifelse(variable == "NIT_amm", "NH4_ugL_sample", variable), + prediction = ifelse(variable == "NIT_nit", prediction/1000/0.001/(1/62.00), prediction), + variable = ifelse(variable == "NIT_amm", "NO3NO2_ugL_sample", variable), + prediction = ifelse(variable == "PHS_frp", prediction/1000/0.001/(1/94.9714), prediction), + variable = ifelse(variable == "PHS_frp", "SRP_ugL_sample", variable), + prediction = ifelse(variable == "CAR_dic", prediction/1000/(1/52.515), prediction), + variable = ifelse(variable == "CAR_dic", "DIC_mgL_sample", variable), + variable = ifelse(variable == "CAR_ch4", "CH4_umolL_sample", variable), + variable = ifelse(variable == "secchi", "Secchi_m_sample", variable), + prediction = ifelse(variable == "co2_flux_mean", prediction/0.001/86400, prediction), + variable = ifelse(variable == "co2_flux_mean", "CO2flux_umolm2s_mean", variable), + prediction = ifelse(variable == "ch4_flux_mean", prediction/0.001/86400, prediction), + variable = ifelse(variable == "ch4_flux_mean", "CH4flux_umolm2s_mean", variable), + depth_m = ifelse(depth_m == 0.0, 0.1, depth_m), + datetime = lubridate::as_datetime(datetime)) |> + dplyr::select(-forecast, -variable_type) |> + dplyr::mutate(parameter = as.character(parameter)) |> + dplyr::bind_rows(bloom_binary) |> + dplyr::bind_rows(ice_binary) |> + dplyr::bind_rows(mix_binary) |> + dplyr::filter(variable %in% vera_variables) |> + mutate(project_id = "vera4cast", + model_id = config$run_config$sim_name, + family = "ensemble", + site_id = "fcre", + duration = "P1D", + datetime = lubridate::as_datetime(datetime), + reference_datetime = lubridate::as_datetime(reference_datetime)) |> + filter(datetime >= reference_datetime) |> + distinct(reference_datetime, datetime, variable, depth_m, parameter, model_id, .keep_all = TRUE) + + file_name <- paste0(config$run_config$sim_name, "-", + lubridate::as_date(vera4cast_df$reference_datetime[1]), ".csv.gz") + readr::write_csv(vera4cast_df, file = file_name) + + message("Scoring forecasts") + + forecasts_s3 <- FLAREr::flare_arrow_s3_bucket( + server_name = "forecasts_parquet", + faasr_prefix = paste0("flare/forecasts/parquet", + "/site_id=", config$location$site_id, + "/model_id=", config$run_config$sim_name, + "/reference_date=", as.character(lubridate::as_date(config$run_config$forecast_start_datetime))), + local_path = file.path(lake_directory, "forecasts", "parquet", + paste0("site_id=", config$location$site_id), + paste0("model_id=", config$run_config$sim_name), + paste0("reference_date=", as.character(lubridate::as_date(config$run_config$forecast_start_datetime)))), + config = config + ) + forecast_df <- arrow::open_dataset(forecasts_s3) |> + dplyr::collect() |> + dplyr::mutate(site_id = config$location$site_id, + model_id = config$run_config$sim_name, + reference_date = lubridate::as_date(config$run_config$forecast_start_datetime)) + + if (config$output_settings$evaluate_past & config$run_config$use_s3) { + past_days <- lubridate::as_date(lubridate::as_date(config$run_config$forecast_start_datetime) - + lubridate::days(config$run_config$forecast_horizon)) + past_s3 <- FLAREr::flare_arrow_s3_bucket( + server_name = "forecasts_parquet", + faasr_prefix = paste0("flare/forecasts/parquet", + "/site_id=", config$location$site_id, + "/model_id=", config$run_config$sim_name, + "/reference_date=", as.character(past_days)), + config = config + ) + past_forecasts <- arrow::open_dataset(past_s3) |> + dplyr::collect() |> + dplyr::mutate(site_id = config$location$site_id, + model_id = config$run_config$sim_name, + reference_date = past_days) + } else { + past_forecasts <- NULL + } + + combined_forecasts <- dplyr::bind_rows(forecast_df, past_forecasts) + + targets_df <- read_csv(file.path(config$file_path$qaqc_data_directory, + paste0(config$location$site_id, "-targets-insitu.csv")), + show_col_types = FALSE) + + scoring <- generate_forecast_score_arrow(targets_df = targets_df, + forecast_df = combined_forecasts, + use_s3 = config$run_config$use_s3, + bucket = config$s3$scores$bucket, + endpoint = config$s3$scores$endpoint, + local_directory = "./scores/fcre", + variable_types = c("state", "parameter"), + config = config) + + forecast_start_datetime <- lubridate::as_datetime(config$run_config$forecast_start_datetime) + lubridate::days(1) + # write_restart persists GLM output timesteps starting from spinup_start + 1, + # so the next iteration's start_datetime must be at least one day after the + # current spinup_start to land on a saved restart date. + start_datetime <- lubridate::as_datetime(config$run_config$forecast_start_datetime) - lubridate::days(3) + # Read the actual filename produced by run_flare; the extension depends on + # whether GLM restart staging emitted a .zip or a plain .nc. + restart_file <- basename(flare_result$restart_file) + + FLAREr::update_run_config(lake_directory = lake_directory, + configure_run_file = configure_run_file, + restart_file = restart_file, + start_datetime = start_datetime, + end_datetime = NA, + forecast_start_datetime = forecast_start_datetime, + forecast_horizon = config$run_config$forecast_horizon, + sim_name = config$run_config$sim_name, + site_id = config$location$site_id, + configure_flare = config$run_config$configure_flare, + configure_obs = config$run_config$configure_obs, + use_s3 = config$run_config$use_s3, + bucket = config$s3$restart$bucket, + endpoint = config$s3$restart$endpoint, + config = config, + use_https = TRUE) + + # Refresh the in-memory config so sourced helpers on the next iteration + # see the updated forecast_start_datetime / restart_file that + # update_run_config just wrote to disk. + config <- FLAREr::set_up_simulation(configure_run_file = configure_run_file, + lake_directory = lake_directory, + config_set_name = config_set_name) + + var1 <- Sys.getenv("AWS_ACCESS_KEY_ID") + var2 <- Sys.getenv("AWS_SECRET_ACCESS_KEY") + Sys.unsetenv("AWS_ACCESS_KEY_ID") + Sys.unsetenv("AWS_SECRET_ACCESS_KEY") + tryCatch( + vera4castHelpers::submit(file_name, first_submission = FALSE), + error = function(e) message("vera4cast submit failed (continuing): ", conditionMessage(e)) + ) + Sys.setenv("AWS_ACCESS_KEY_ID" = var1, + "AWS_SECRET_ACCESS_KEY" = var2) + + noaa_ready <- tryCatch( + FLAREr::check_noaa_present(lake_directory, + configure_run_file, + config_set_name = config_set_name), + error = function(e) { + message("check_noaa_present errored (treating as not-ready): ", + conditionMessage(e)) + FALSE + } + ) + + reference_date <- lubridate::as_date(forecast_start_datetime) + s3 <- FLAREr::flare_arrow_s3_bucket(server_name = "vera4cast_forecasts", + faasr_prefix = inflow_prefix, + config = config) + avail_dates <- gsub("reference_date=", "", s3$ls()) + inflow_ready <- reference_date %in% avail_dates + } + + invisible(NULL) +}