Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions src/reflow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@
# Python interpreter used for worker and dispatch jobs.
# python = "/path/to/python"

# Cap on how many array tasks reflow submits at once (per wave).
# Set this to your scheduler's per-user job-submit limit
# (e.g. Slurm AssocMaxSubmitJobLimit). A large array task is then
# submitted in waves of at most this many indices; the remainder stay
# PENDING and are submitted as each wave drains, so the number of
# queued jobs never exceeds the cap. Leave a little headroom below
# the true limit for the dispatch job and any singleton tasks.
# Unset = no cap (submit the whole array at once).
# This is a reflow-internal scheduling cap, not a scheduler flag, so it
# is a plain [executor] key (not under [executor.submit_options]).
# max_submit_jobs = 900

# Override scheduler command paths if they are not on $PATH.
# Slurm:
# sbatch = "/usr/bin/sbatch"
Expand Down Expand Up @@ -313,6 +325,35 @@ def executor_python(self) -> str | None:
def executor_mode(self) -> str | None:
return self._get("executor", "mode", "REFLOW_MODE")

@property
def max_submit_jobs(self) -> int | None:
"""Cap on array tasks submitted per wave.

Set this to your scheduler's per-user job-submit limit (e.g. Slurm
``AssocMaxSubmitJobLimit``). When set, a large array task is
submitted in waves of at most this many indices; the remainder stay
PENDING and are submitted by the dependency-triggered follow-up
dispatch as each wave drains, so the number of queued jobs never
exceeds the cap. ``None`` (the default) disables capping and submits
the whole array at once. Leave a little headroom below the true
limit for the follow-up dispatch job itself and any concurrent
singleton tasks.

This is a reflow-internal scheduling cap, *not* a scheduler flag, so
it is a plain ``[executor]`` key rather than a ``submit_options``
entry (those are rendered onto the sbatch command line).

Config: ``[executor] max_submit_jobs``; env: ``REFLOW_MAX_SUBMIT_JOBS``.
"""
raw = self._get("executor", "max_submit_jobs", "REFLOW_MAX_SUBMIT_JOBS")
if raw is None:
return None
try:
value = int(raw)
except ValueError:
return None
return value if value > 0 else None

@property
def executor_sbatch(self) -> str | None:
return self._get_submit_option("sbatch")
Expand Down
29 changes: 28 additions & 1 deletion src/reflow/stores/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,16 @@ def update_task_submitted(
run_id: str,
task_name: str,
job_id: str,
indices: list[int] | None = None,
) -> None:
"""Mark pending/retrying instances as submitted."""
"""Mark pending/retrying instances as submitted.

When ``indices`` is ``None`` all pending/retrying instances of the
task are marked (singleton tasks and full-array submits). When a
list of array indices is given, only those instances are marked,
so a large array can be submitted in capped waves, each wave with
its own job id.
"""

@abc.abstractmethod
def update_task_running(self, instance_id: int) -> None:
Expand Down Expand Up @@ -224,6 +232,25 @@ def update_task_success(
def update_task_failed(self, instance_id: int, error_text: str) -> None:
"""Mark one instance as failed."""

@abc.abstractmethod
def fail_pending_tasks(
self,
run_id: str,
task_name: str,
error_text: str,
indices: list[int] | None = None,
) -> int:
"""Mark not-yet-running instances of a task as FAILED.

Mirrors :meth:`update_task_submitted`: with ``indices=None`` every
not-yet-running instance of the task is failed; with a list of
array indices, only those are. Used when the scheduler cannot
place work (e.g. the batch system rejected the submission) so the
run finalises as FAILED instead of hanging with instances stuck in
PENDING/SUBMITTED. Only states that have not started executing are
affected. Returns the number of instances updated.
"""

@abc.abstractmethod
def update_task_cancelled(self, instance_id: int) -> None:
"""Mark one instance as cancelled."""
Expand Down
77 changes: 66 additions & 11 deletions src/reflow/stores/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,21 +620,76 @@ def update_task_submitted(
run_id: str,
task_name: str,
job_id: str,
indices: list[int] | None = None,
) -> None:
self.conn.execute(
"""Mark pending/retrying instances of a task as SUBMITTED.

With ``indices=None`` every pending/retrying instance of the task
is marked (singleton tasks and full-array submits). With a list of
array indices, only those instances are marked, so a large array
can be submitted in capped waves, each wave carrying its own
``job_id``.
"""
params: list[Any] = [
TaskState.SUBMITTED.value,
job_id,
_utcnow(),
run_id,
task_name,
TaskState.PENDING.value,
TaskState.RETRYING.value,
]
sql = (
"UPDATE task_instances SET state = ?, job_id = ?, updated_at = ? "
"WHERE run_id = ? AND task_name = ? AND state IN (?, ?)",
(
TaskState.SUBMITTED.value,
job_id,
_utcnow(),
run_id,
task_name,
TaskState.PENDING.value,
TaskState.RETRYING.value,
),
"WHERE run_id = ? AND task_name = ? AND state IN (?, ?)"
)
if indices is not None:
idx = list(indices)
if not idx:
return
sql += " AND array_index IN ({})".format(",".join("?" * len(idx)))
params.extend(idx)
self.conn.execute(sql, params)
self.conn.commit()

@_retry_on_locked
def fail_pending_tasks(
self,
run_id: str,
task_name: str,
error_text: str,
indices: list[int] | None = None,
) -> int:
"""Mark not-yet-running instances of *task_name* as FAILED.

Used when the scheduler cannot place work (e.g. the batch system
rejected the submission), so the run finalises as FAILED instead
of hanging with tasks stuck in PENDING/SUBMITTED. Only states that
have not started executing are affected. Returns the row count.
"""
params: list[Any] = [
TaskState.FAILED.value,
error_text,
_utcnow(),
run_id,
task_name,
TaskState.PENDING.value,
TaskState.RETRYING.value,
TaskState.SUBMITTED.value,
]
sql = (
"UPDATE task_instances SET state = ?, error_text = ?, updated_at = ? "
"WHERE run_id = ? AND task_name = ? AND state IN (?, ?, ?)"
)
if indices is not None:
idx = list(indices)
if not idx:
return 0
sql += " AND array_index IN ({})".format(",".join("?" * len(idx)))
params.extend(idx)
cur = self.conn.execute(sql, params)
self.conn.commit()
return cur.rowcount

@_retry_on_locked
def update_task_running(self, instance_id: int) -> None:
Expand Down
Loading
Loading