From e1a5cebbe907d5bda0f6a1f66f2e9f6740800888 Mon Sep 17 00:00:00 2001 From: Michele Simionato Date: Sat, 30 May 2026 04:42:28 +0200 Subject: [PATCH 1/4] Fixed setting environment variables in workflows --- demos/hazard/AreaSourceClassicalPSHA/job.toml | 1 + openquake/engine/engine.py | 2 +- openquake/qa_tests_data/event_based_risk/case_4a/jobs.toml | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/demos/hazard/AreaSourceClassicalPSHA/job.toml b/demos/hazard/AreaSourceClassicalPSHA/job.toml index 4d3aaec77416..f01047de5405 100644 --- a/demos/hazard/AreaSourceClassicalPSHA/job.toml +++ b/demos/hazard/AreaSourceClassicalPSHA/job.toml @@ -1,5 +1,6 @@ [workflow] description = "AreaSource demo workflow" +env.OQ_SAMPLE_SITES = 0.1 [Job] ini = "job.ini" diff --git a/openquake/engine/engine.py b/openquake/engine/engine.py index 2f18297a6142..017e23db5dc5 100644 --- a/openquake/engine/engine.py +++ b/openquake/engine/engine.py @@ -458,7 +458,7 @@ def __init__(self, workflow_toml, defaults, ddic, prefix=''): self.may_fail = self.defaults.pop('may_fail', []) # set the passed environment variables if not already set - env = defaults.get('workflow', {}).get('env', {}) + env = defaults.get('env', {}) for k, v in env.items(): if k not in os.environ: os.environ[k] = str(v) diff --git a/openquake/qa_tests_data/event_based_risk/case_4a/jobs.toml b/openquake/qa_tests_data/event_based_risk/case_4a/jobs.toml index 48dcb2d3f0ce..7e921f1007fe 100644 --- a/openquake/qa_tests_data/event_based_risk/case_4a/jobs.toml +++ b/openquake/qa_tests_data/event_based_risk/case_4a/jobs.toml @@ -1,5 +1,6 @@ [multi.workflow] description = "multi-workflow" +env.OQ_SAMPLE_ASSETS = 0.1 may_fail = ["riskJob"] replace = {out_file="SES.hdf5", hazard_calculation_id="SES.hdf5"} From 02f7dd66f8ec99b48cb9047d3a4fba9c1d39ca66 Mon Sep 17 00:00:00 2001 From: Michele Simionato Date: Sat, 30 May 2026 05:03:08 +0200 Subject: [PATCH 2/4] Refactored _Workflow --- openquake/engine/engine.py | 47 +++++++++++++++---- .../event_based_risk/case_4a/jobs.toml | 2 +- 2 files changed, 38 insertions(+), 11 deletions(-) diff --git a/openquake/engine/engine.py b/openquake/engine/engine.py index 017e23db5dc5..fc4308ad5c68 100644 --- a/openquake/engine/engine.py +++ b/openquake/engine/engine.py @@ -449,6 +449,7 @@ def run_jobs(jobctxs, concurrent_jobs=None, nodes=1, sbatch=False, class _Workflow: # workflow objects are instantiated by the function `read_many` + # prefix is empty unless we are in a multi-workflow def __init__(self, workflow_toml, defaults, ddic, prefix=''): self.workflow_toml = workflow_toml self.workflow_dir = os.path.dirname(workflow_toml) @@ -456,20 +457,21 @@ def __init__(self, workflow_toml, defaults, ddic, prefix=''): self.description = defaults.pop('description') self.checkout = self.defaults.pop('checkout', {}) # repo->branch self.may_fail = self.defaults.pop('may_fail', []) + self.env = defaults.pop('env', {}) + self.override = defaults.pop('override', {}) # set the passed environment variables if not already set - env = defaults.get('env', {}) - for k, v in env.items(): + for k, v in self.env.items(): if k not in os.environ: os.environ[k] = str(v) - # override feature for multi-workflows - repl = defaults.get('workflow', {}).get('override', {}) - if repl: + # override feature + if self.override: + breakpoint() for _, dic in ddic.items(): for name in dic: - if name in repl: - dic[name] = repl[name] + if name in self.override: + dic[name] = self.override[name] # check the repositories exist for value in self.checkout: @@ -500,9 +502,34 @@ def __init__(self, workflow_toml, defaults, ddic, prefix=''): inis.append(dic) names.append(prefix + k) - check_unique(names, workflow_toml) - self.inis = numpy.array(inis) - self.names = numpy.array(names) + self.inis, self.names, self.success = self._ini_name_success( + ddic, prefix) + check_unique(self.names, workflow_toml) + + def _ini_name_success(self, ddic, prefix): + inis = [] + names = [] + success = [] + for k, dic in ddic.items(): + assert len(k) <= 20, k + if k == 'success': + if isinstance(dic, dict): + self.success = [dic] + elif isinstance(dic, list): + self.success = dic + else: + raise ValueError('"success": %s', dic) + for s in self.success: + s['func'] # each success dictionary must contain a func + self.fix_paths(self.success) + continue + + assert k[0].isupper(), k + assert isinstance(dic, dict), dic + self.fix_paths([dic]) + inis.append(dic) + names.append(prefix + k) + return numpy.array(inis), numpy.array(names), success def fix_paths(self, dicts): """ diff --git a/openquake/qa_tests_data/event_based_risk/case_4a/jobs.toml b/openquake/qa_tests_data/event_based_risk/case_4a/jobs.toml index 7e921f1007fe..5de2a393b87c 100644 --- a/openquake/qa_tests_data/event_based_risk/case_4a/jobs.toml +++ b/openquake/qa_tests_data/event_based_risk/case_4a/jobs.toml @@ -2,7 +2,7 @@ description = "multi-workflow" env.OQ_SAMPLE_ASSETS = 0.1 may_fail = ["riskJob"] -replace = {out_file="SES.hdf5", hazard_calculation_id="SES.hdf5"} +override = {out_file="SES.hdf5", hazard_calculation_id="SES.hdf5"} [hazard.Job] ini = "job_hazard.ini" From f8dbfe6822b32f63a35fe96016b81757ff17de02 Mon Sep 17 00:00:00 2001 From: Michele Simionato Date: Sat, 30 May 2026 05:17:44 +0200 Subject: [PATCH 3/4] Removed accidental breakpoint --- openquake/engine/engine.py | 1 - 1 file changed, 1 deletion(-) diff --git a/openquake/engine/engine.py b/openquake/engine/engine.py index fc4308ad5c68..09cd2a0a5653 100644 --- a/openquake/engine/engine.py +++ b/openquake/engine/engine.py @@ -467,7 +467,6 @@ def __init__(self, workflow_toml, defaults, ddic, prefix=''): # override feature if self.override: - breakpoint() for _, dic in ddic.items(): for name in dic: if name in self.override: From 950b3ad8c94080b8d5eae25b2e91a3700eeacb0c Mon Sep 17 00:00:00 2001 From: Michele Simionato Date: Sat, 30 May 2026 05:27:28 +0200 Subject: [PATCH 4/4] Fixed test --- openquake/engine/engine.py | 32 ++++---------------------------- 1 file changed, 4 insertions(+), 28 deletions(-) diff --git a/openquake/engine/engine.py b/openquake/engine/engine.py index 09cd2a0a5653..a8078d1d8d35 100644 --- a/openquake/engine/engine.py +++ b/openquake/engine/engine.py @@ -478,37 +478,13 @@ def __init__(self, workflow_toml, defaults, ddic, prefix=''): if not os.path.exists(repodir): raise FileNotFoundError(repodir) - inis = [] - names = [] - self.success = [] - for k, dic in ddic.items(): - assert len(k) <= 20, k - if k == 'success': - if isinstance(dic, dict): - self.success = [dic] - elif isinstance(dic, list): - self.success = dic - else: - raise ValueError('"success": %s', dic) - for s in self.success: - s['func'] # each success dictionary must contain a func - self.fix_paths(self.success) - continue - - assert k[0].isupper(), k - assert isinstance(dic, dict), dic - self.fix_paths([dic]) - inis.append(dic) - names.append(prefix + k) - - self.inis, self.names, self.success = self._ini_name_success( - ddic, prefix) + self.inis, self.names = self._inis_names(ddic, prefix) check_unique(self.names, workflow_toml) - def _ini_name_success(self, ddic, prefix): + def _inis_names(self, ddic, prefix): inis = [] names = [] - success = [] + self.success = [] for k, dic in ddic.items(): assert len(k) <= 20, k if k == 'success': @@ -528,7 +504,7 @@ def _ini_name_success(self, ddic, prefix): self.fix_paths([dic]) inis.append(dic) names.append(prefix + k) - return numpy.array(inis), numpy.array(names), success + return numpy.array(inis), numpy.array(names) def fix_paths(self, dicts): """