Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions internal/cmn/schema/dag.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@
{ "type": "integer" },
{ "type": "string" }
],
"description": "Maximum number of retry attempts"
"description": "Maximum number of retry attempts for a failed step. Use 0 to disable automatic retries for that step."
},
"interval_sec": {
"oneOf": [
Expand Down Expand Up @@ -937,10 +937,10 @@
"properties": {
"limit": {
"oneOf": [
{ "type": "integer", "minimum": 1 },
{ "type": "string", "pattern": "^0*[1-9][0-9]*$" }
{ "type": "integer", "minimum": 0 },
{ "type": "string", "pattern": "^[0-9]+$" }
],
"description": "Maximum number of retry attempts"
"description": "Maximum number of scheduler-issued DAG retry attempts. Use 0 to disable DAG-level automatic retries."
},
"interval_sec": {
"oneOf": [
Expand Down
66 changes: 66 additions & 0 deletions internal/cmn/schema/dag_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,26 @@ retry_policy:
max_interval_sec: "60"
steps:
- command: echo hi
`,
},
{
name: "LimitZero",
spec: `
name: retryable-dag
retry_policy:
limit: 0
steps:
- command: echo hi
`,
},
{
name: "StringLimitZero",
spec: `
name: retryable-dag
retry_policy:
limit: "0"
steps:
- command: echo hi
`,
},
{
Expand All @@ -411,6 +431,28 @@ retry_policy:
interval_sec: 10
steps:
- command: echo hi
`,
wantErr: "retry_policy",
},
{
name: "RejectsNegativeLimit",
spec: `
name: retryable-dag
retry_policy:
limit: -1
steps:
- command: echo hi
`,
wantErr: "retry_policy",
},
{
name: "RejectsNegativeStringLimit",
spec: `
name: retryable-dag
retry_policy:
limit: "-1"
steps:
- command: echo hi
`,
wantErr: "retry_policy",
},
Expand All @@ -423,6 +465,30 @@ retry_policy:
interval_sec: later
steps:
- command: echo hi
`,
wantErr: "retry_policy",
},
{
name: "RejectsZeroInterval",
spec: `
name: retryable-dag
retry_policy:
limit: 1
interval_sec: 0
steps:
- command: echo hi
`,
wantErr: "retry_policy",
},
{
name: "RejectsZeroMaxInterval",
spec: `
name: retryable-dag
retry_policy:
limit: 1
max_interval_sec: 0
steps:
- command: echo hi
`,
wantErr: "retry_policy",
},
Expand Down
25 changes: 25 additions & 0 deletions internal/core/exec/runstatus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,31 @@ func TestInitialStatusSnapshotsDAGRetryMetadata(t *testing.T) {
assert.Equal(t, "retry-dag", status.SuspendFlagName)
}

func TestInitialStatusSnapshotsDisabledDAGRetryPolicy(t *testing.T) {
t.Parallel()

dag := &core.DAG{
Name: "retry-disabled-dag",
Queue: "shared-queue",
Location: "/tmp/retry-disabled-dag.yaml",
RetryPolicy: &core.DAGRetryPolicy{
Limit: 0,
Interval: time.Minute,
Backoff: 0,
MaxInterval: time.Hour,
},
}

status := exec.InitialStatus(dag)

assert.Equal(t, 0, status.AutoRetryLimit)
assert.Equal(t, time.Minute, status.AutoRetryInterval)
assert.Equal(t, 0.0, status.AutoRetryBackoff)
assert.Equal(t, time.Hour, status.AutoRetryMaxInterval)
assert.Equal(t, "shared-queue", status.ProcGroup)
assert.Equal(t, "retry-disabled-dag", status.SuspendFlagName)
}

func TestPendingStepRetriesFromStatus(t *testing.T) {
t.Parallel()

Expand Down
32 changes: 20 additions & 12 deletions internal/core/spec/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ func parseDAGRetryInterval(v any) (time.Duration, string, error) {
if v == nil {
return 60 * time.Second, "", nil
}
interval, intervalStr, err := parseConcreteDAGRetryInt("retry_policy.interval_sec", v)
interval, intervalStr, err := parseConcreteDAGRetryInt("retry_policy.interval_sec", v, false)
if err != nil {
return 0, "", err
}
Expand All @@ -1019,7 +1019,7 @@ func parseDAGRetryMaxInterval(v any) (time.Duration, error) {
if v == nil {
return time.Hour, nil
}
seconds, _, err := parseConcreteDAGRetryInt("retry_policy.max_interval_sec", v)
seconds, _, err := parseConcreteDAGRetryInt("retry_policy.max_interval_sec", v, false)
if err != nil {
return 0, err
}
Expand All @@ -1030,31 +1030,39 @@ func parseDAGRetryLimit(v any) (int, error) {
if v == nil {
return 0, core.NewValidationError("retry_policy.limit", nil, fmt.Errorf("limit is required when retry_policy is specified"))
}
limit, _, err := parseConcreteDAGRetryInt("retry_policy.limit", v)
limit, _, err := parseConcreteDAGRetryInt("retry_policy.limit", v, true)
if err != nil {
return 0, err
}
return limit, nil
}

func parseConcreteDAGRetryInt(fieldName string, val any) (int, string, error) {
func parseConcreteDAGRetryInt(fieldName string, val any, allowZero bool) (int, string, error) {
invalidPositiveValue := func(value any) (int, string, error) {
operator := "> 0"
if allowZero {
operator = ">= 0"
}
return 0, "", core.NewValidationError(fieldName, value, fmt.Errorf("%s must be %s", retryFieldLabel(fieldName), operator))
}

switch v := val.(type) {
case int:
if v <= 0 {
return 0, "", core.NewValidationError(fieldName, v, fmt.Errorf("%s must be > 0", retryFieldLabel(fieldName)))
if v < 0 || (!allowZero && v == 0) {
return invalidPositiveValue(v)
}
return v, "", nil
case int64:
if v <= 0 {
return 0, "", core.NewValidationError(fieldName, v, fmt.Errorf("%s must be > 0", retryFieldLabel(fieldName)))
if v < 0 || (!allowZero && v == 0) {
return invalidPositiveValue(v)
}
if v > math.MaxInt {
return 0, "", core.NewValidationError(fieldName, v, fmt.Errorf("value %d exceeds maximum int", v))
}
return int(v), "", nil
case uint64:
if v == 0 {
return 0, "", core.NewValidationError(fieldName, v, fmt.Errorf("%s must be > 0", retryFieldLabel(fieldName)))
if !allowZero && v == 0 {
return invalidPositiveValue(v)
}
if v > math.MaxInt {
return 0, "", core.NewValidationError(fieldName, v, fmt.Errorf("value %d exceeds maximum int", v))
Expand All @@ -1065,8 +1073,8 @@ func parseConcreteDAGRetryInt(fieldName string, val any) (int, string, error) {
if err != nil {
return 0, "", core.NewValidationError(fieldName, v, fmt.Errorf("%s must be an integer or numeric string", retryFieldLabel(fieldName)))
}
if parsed <= 0 {
return 0, "", core.NewValidationError(fieldName, v, fmt.Errorf("%s must be > 0", retryFieldLabel(fieldName)))
if parsed < 0 || (!allowZero && parsed == 0) {
return invalidPositiveValue(v)
}
return parsed, v, nil
default:
Expand Down
12 changes: 12 additions & 0 deletions internal/core/spec/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,18 @@ func (*mergeTransformer) Transformer(
}
}

if typ == reflect.TypeFor[core.DAGRetryPolicy]() {
// DAG retry policies are configured as a single root object. Replace the
// inherited policy wholesale so limit: 0 can intentionally disable retries.
return func(dst, src reflect.Value) error {
if dst.CanSet() {
dst.Set(src)
}

return nil
}
}

if typ == reflect.TypeFor[core.KubernetesConfig]() {
return func(dst, src reflect.Value) error {
if !dst.CanSet() || !src.IsValid() || src.IsNil() {
Expand Down
126 changes: 126 additions & 0 deletions internal/core/spec/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1948,6 +1948,30 @@ steps:
require.Equal(t, 60*time.Second, dag.RetryPolicy.Interval)
})

t.Run("BaseConfigDAGRetryPolicyCanBeDisabledByChild", func(t *testing.T) {
t.Parallel()

base := createTempYAMLFile(t, `
retry_policy:
limit: 3
interval_sec: 30
max_interval_sec: 300
`)
child := createTempYAMLFile(t, `
retry_policy:
limit: 0
steps:
- name: step1
command: echo "test"
`)
dag, err := spec.Load(context.Background(), child, spec.WithBaseConfig(base))
require.NoError(t, err)
require.NotNil(t, dag.RetryPolicy)
require.Equal(t, 0, dag.RetryPolicy.Limit)
require.Equal(t, 60*time.Second, dag.RetryPolicy.Interval)
require.Equal(t, time.Hour, dag.RetryPolicy.MaxInterval)
})

t.Run("DAGRetryPolicyNormalization", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1977,6 +2001,38 @@ steps:
MaxInterval: 300 * time.Second,
},
},
{
name: "LimitZeroDefaultsRetryIntervals",
spec: `
name: retryable
retry_policy:
limit: 0
steps:
- command: echo hi
`,
wantPolicy: &core.DAGRetryPolicy{
Limit: 0,
Interval: 60 * time.Second,
Backoff: 0,
MaxInterval: time.Hour,
},
},
{
name: "StringLimitZeroDefaultsRetryIntervals",
spec: `
name: retryable
retry_policy:
limit: "0"
steps:
- command: echo hi
`,
wantPolicy: &core.DAGRetryPolicy{
Limit: 0,
Interval: 60 * time.Second,
Backoff: 0,
MaxInterval: time.Hour,
},
},
{
name: "BackoffFalseUsesFixedInterval",
spec: `
Expand Down Expand Up @@ -2004,6 +2060,28 @@ retry_policy:
limit: three
steps:
- command: echo hi
`,
errContains: "retry_policy.limit",
},
{
name: "RejectsNegativeLimit",
spec: `
name: retryable
retry_policy:
limit: -1
steps:
- command: echo hi
`,
errContains: "retry_policy.limit",
},
{
name: "RejectsNegativeStringLimit",
spec: `
name: retryable
retry_policy:
limit: "-1"
steps:
- command: echo hi
`,
errContains: "retry_policy.limit",
},
Expand All @@ -2016,6 +2094,30 @@ retry_policy:
interval_sec: later
steps:
- command: echo hi
`,
errContains: "retry_policy.interval_sec",
},
{
name: "RejectsZeroInterval",
spec: `
name: retryable
retry_policy:
limit: 1
interval_sec: 0
steps:
- command: echo hi
`,
errContains: "retry_policy.interval_sec",
},
{
name: "RejectsNegativeInterval",
spec: `
name: retryable
retry_policy:
limit: 1
interval_sec: -1
steps:
- command: echo hi
`,
errContains: "retry_policy.interval_sec",
},
Expand All @@ -2032,6 +2134,30 @@ steps:
`,
errContains: "retry_policy.backoff",
},
{
name: "RejectsZeroMaxInterval",
spec: `
name: retryable
retry_policy:
limit: 1
max_interval_sec: 0
steps:
- command: echo hi
`,
errContains: "retry_policy.max_interval_sec",
},
{
name: "RejectsNegativeMaxInterval",
spec: `
name: retryable
retry_policy:
limit: 1
max_interval_sec: -1
steps:
- command: echo hi
`,
errContains: "retry_policy.max_interval_sec",
},
}

for _, tt := range tests {
Expand Down
Loading