Skip to content
1 change: 1 addition & 0 deletions docs/changes/2188.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow NSHOW to follow a pre-defined power law for simulation production grid definition.
11 changes: 11 additions & 0 deletions docs/source/api-reference/production_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ the calculation of the number of events to be simulated given a pre-determined m
:members:
```


(job-spec-builder)=

## job_spec_builder

```{eval-rst}
.. automodule:: production_configuration.job_spec_builder
:members:
```


(interpolation-handler)=

## interpolation_handler
Expand Down
25 changes: 25 additions & 0 deletions src/simtools/applications/simulate_prod_htcondor_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@
Base directory for simulation output packages passed through as ``pack_for_grid_register``.
priority (int, optional)
Job priority (default: 1).
nshow_power_index (float, optional)
Power-law index used to scale the baseline ``nshow`` for each ``energy_range`` entry.
nshow_reference_energy (quantity, optional)
Reference energy used for ``nshow`` power-law scaling (e.g. ``"100 GeV"``).
Required when ``nshow_power_index`` is configured.

(all other command line arguments are identical to those of :ref:`simulate_prod`).

Expand Down Expand Up @@ -97,6 +102,26 @@ def _add_arguments(parser):
required=False,
default=None,
)
parser.add_argument(
"--nshow_power_index",
help=(
"Power-law index used to scale the baseline nshow with the geometric-mean energy "
"of each energy_range entry."
),
type=float,
required=False,
default=None,
)
parser.add_argument(
"--nshow_reference_energy",
help=(
"Reference energy for nshow power-law scaling (for example: '100 GeV'). "
"Required together with --nshow_power_index."
),
type=str,
required=False,
default=None,
)


def main():
Expand Down
199 changes: 8 additions & 191 deletions src/simtools/job_execution/htcondor_script_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,19 @@

"""

import ast
import itertools
import logging
import re
from pathlib import Path

import astropy.units as u

import simtools.version as simtools_version
from simtools.configuration import defaults
from simtools.production_configuration.job_spec_builder import (
build_job_specs,
resolve_array_layout_name,
)

_logger = logging.getLogger(__name__)

_GRID_AXES = [
"primary",
"azimuth_angle",
"zenith_angle",
"model_version",
"corsika_le_interaction",
"corsika_he_interaction",
]

_GRID_AXIS_DEFAULTS = {
"corsika_le_interaction": defaults.CORSIKA_LE_INTERACTION,
"corsika_he_interaction": defaults.CORSIKA_HE_INTERACTION,
}

_PARAMS_FIELDS = [
"apptainer_label",
"primary",
Expand All @@ -56,45 +42,6 @@
]


def _normalize_to_list(value):
"""Normalize scalar values to lists of length one."""
if isinstance(value, list):
return value
if isinstance(value, tuple):
return list(value)
return [value]


def _normalize_grid_axes(args_dict):
"""Return normalized grid axes for cartesian product expansion."""
return {
axis: (
_normalize_to_list(args_dict[axis])
if axis in args_dict and args_dict[axis] is not None
else [_GRID_AXIS_DEFAULTS[axis]]
if axis in _GRID_AXIS_DEFAULTS
else [None]
)
for axis in _GRID_AXES
}


def _normalize_energy_ranges(energy_range):
"""Normalize energy range argument to a list of (e_min, e_max) pairs."""
if isinstance(energy_range, tuple) and len(energy_range) == 2:
return [energy_range]

if isinstance(energy_range, list):
if len(energy_range) == 2 and all(hasattr(item, "to") for item in energy_range):
return [(energy_range[0], energy_range[1])]
if all(isinstance(item, (list, tuple)) and len(item) == 2 for item in energy_range):
return [tuple(item) for item in energy_range]

raise ValueError(
"energy_range must be one pair (e_min, e_max) or a list of (e_min, e_max) pairs."
)


def _resolve_apptainer_images(apptainer_image_arg):
"""
Resolve and validate apptainer image configuration.
Expand Down Expand Up @@ -179,141 +126,11 @@ def _sanitize_label_for_params(label):
return re.sub(r"\s+", "_", str(label).strip())


def _resolve_array_layout_name(array_layout_name, model_version):
"""Resolve array layout configuration for a specific model version."""
if isinstance(array_layout_name, list) and len(array_layout_name) == 1:
array_layout_name = array_layout_name[0]

# Configurator/_fill_config stringifies dict values when rebuilding argparse arguments.
if isinstance(array_layout_name, str) and array_layout_name.strip().startswith("{"):
try:
parsed_layout = ast.literal_eval(array_layout_name)
if isinstance(parsed_layout, dict):
array_layout_name = parsed_layout
except (SyntaxError, ValueError):
return array_layout_name

if not isinstance(array_layout_name, dict) or list(array_layout_name) != ["by_version"]:
return array_layout_name

resolved = simtools_version.resolve_by_version(
{"array_layout_name": array_layout_name}, model_version
)
return resolved["array_layout_name"]


def _get_energy_range_for_zenith_angle(zenith_angle, energy_range_pair, corsika_limits):
"""
Return a zenith-dependent energy range pair or None to skip the simulation step.

Dummy function - to be implemented.
"""
_ = (zenith_angle, corsika_limits)
return energy_range_pair


def _get_core_scatter_max_for_zenith_angle(zenith_angle, core_scatter, corsika_limits):
"""Return zenith-dependent max core-scatter value.

Dummy function - to be implemented.
"""
_ = (zenith_angle, corsika_limits)
return core_scatter[1]


def _get_nshow_for_energy_range_and_zenith_angle(
energy_range_pair, zenith_angle, nshow, corsika_limits
):
"""Return nshow that may depend on energy range and zenith angle.

Dummy function - to be implemented.
"""
_ = (energy_range_pair, zenith_angle, corsika_limits)
return nshow


def _build_job_specs(args_dict, image_labels):
"""Build backend-agnostic job specs from comparison and production grids."""
grid_axes = _normalize_grid_axes(args_dict)
energy_ranges = _normalize_energy_ranges(args_dict["energy_range"])
base_pack_dir = args_dict.get("simulation_output") or "simtools-output"
corsika_limits = args_dict.get("corsika_limits")
core_scatter = args_dict["core_scatter"]
nshow = args_dict["nshow"]

combinations = list(
itertools.product(
grid_axes["primary"],
grid_axes["azimuth_angle"],
grid_axes["zenith_angle"],
grid_axes["model_version"],
grid_axes["corsika_le_interaction"],
grid_axes["corsika_he_interaction"],
energy_ranges,
)
)

number_of_runs = args_dict.get("number_of_runs", 1)
run_number = int(args_dict.get("run_number") or 1)

job_specs = []
for label in image_labels:
params_label = _sanitize_label_for_params(label)
row_index = 0
for (
primary,
azimuth,
zenith,
model_version,
corsika_le,
corsika_he,
energy_range_pair,
) in combinations:
selected_energy_range_pair = energy_range_pair
selected_core_scatter_max = core_scatter[1]
selected_nshow = nshow

if corsika_limits is not None:
selected_energy_range_pair = _get_energy_range_for_zenith_angle(
zenith, energy_range_pair, corsika_limits
)
if selected_energy_range_pair is None:
continue
selected_core_scatter_max = _get_core_scatter_max_for_zenith_angle(
zenith, core_scatter, corsika_limits
)
selected_nshow = _get_nshow_for_energy_range_and_zenith_angle(
selected_energy_range_pair, zenith, nshow, corsika_limits
)

for _ in range(number_of_runs):
job_specs.append(
{
"apptainer_label": str(label),
"primary": primary,
"azimuth_angle": azimuth,
"zenith_angle": zenith,
"model_version": model_version,
"array_layout_name": args_dict.get("array_layout_name"),
"corsika_le_interaction": corsika_le,
"corsika_he_interaction": corsika_he,
"energy_min": selected_energy_range_pair[0],
"energy_max": selected_energy_range_pair[1],
"core_scatter_max": selected_core_scatter_max,
"nshow": selected_nshow,
"pack_for_grid_register": f"{base_pack_dir}/{params_label}",
"run_number": run_number + row_index,
}
)
row_index += 1
return job_specs


def _group_job_specs_by_label(job_specs):
"""Group job specs by apptainer image label."""
grouped = {}
for job_spec in job_specs:
label = job_spec["apptainer_label"]
label = job_spec["image_label"]
grouped.setdefault(label, []).append(job_spec)
return grouped

Expand All @@ -322,7 +139,7 @@ def _write_params_file(params_file_path, label_job_specs):
"""Write parameter file consumed by HTCondor queue-from syntax."""
with open(params_file_path, "w", encoding="utf-8") as params_file_handle:
for job_spec in label_job_specs:
array_layout_name = _resolve_array_layout_name(
array_layout_name = resolve_array_layout_name(
job_spec["array_layout_name"], job_spec["model_version"]
)

Expand All @@ -337,7 +154,7 @@ def _write_params_file(params_file_path, label_job_specs):
)

row = [
_format_param_value(job_spec["apptainer_label"], "apptainer_label"),
_format_param_value(job_spec["image_label"], "apptainer_label"),
_format_param_value(job_spec["primary"], "primary"),
_format_param_value(job_spec["azimuth_angle"], "azimuth_angle"),
_format_param_value(job_spec["zenith_angle"], "zenith_angle"),
Expand Down Expand Up @@ -368,7 +185,7 @@ def generate_submission_script(args_dict):
Arguments dictionary.
"""
apptainer_images = _resolve_apptainer_images(args_dict["apptainer_image"])
job_specs = _build_job_specs(args_dict, list(apptainer_images.keys()))
job_specs = build_job_specs(args_dict, list(apptainer_images.keys()))
grouped_job_specs = _group_job_specs_by_label(job_specs)

work_dir = Path(args_dict["output_path"])
Expand Down
Loading