diff --git a/internal/cmn/schema/dag.schema.json b/internal/cmn/schema/dag.schema.json index 32d2ec91f..41520dd46 100644 --- a/internal/cmn/schema/dag.schema.json +++ b/internal/cmn/schema/dag.schema.json @@ -892,7 +892,7 @@ { "type": "integer" }, { "type": "string" } ], - "description": "Maximum number of retry attempts" + "description": "Maximum number of retry attempts for a failed step. Use 0 to disable automatic retries for that step." }, "interval_sec": { "oneOf": [ @@ -937,10 +937,10 @@ "properties": { "limit": { "oneOf": [ - { "type": "integer", "minimum": 1 }, - { "type": "string", "pattern": "^0*[1-9][0-9]*$" } + { "type": "integer", "minimum": 0 }, + { "type": "string", "pattern": "^[0-9]+$" } ], - "description": "Maximum number of retry attempts" + "description": "Maximum number of scheduler-issued DAG retry attempts. Use 0 to disable DAG-level automatic retries." }, "interval_sec": { "oneOf": [ diff --git a/internal/cmn/schema/dag_schema_test.go b/internal/cmn/schema/dag_schema_test.go index fa6573a98..65221a262 100644 --- a/internal/cmn/schema/dag_schema_test.go +++ b/internal/cmn/schema/dag_schema_test.go @@ -389,6 +389,26 @@ retry_policy: max_interval_sec: "60" steps: - command: echo hi +`, + }, + { + name: "LimitZero", + spec: ` +name: retryable-dag +retry_policy: + limit: 0 +steps: + - command: echo hi +`, + }, + { + name: "StringLimitZero", + spec: ` +name: retryable-dag +retry_policy: + limit: "0" +steps: + - command: echo hi `, }, { @@ -411,6 +431,28 @@ retry_policy: interval_sec: 10 steps: - command: echo hi +`, + wantErr: "retry_policy", + }, + { + name: "RejectsNegativeLimit", + spec: ` +name: retryable-dag +retry_policy: + limit: -1 +steps: + - command: echo hi +`, + wantErr: "retry_policy", + }, + { + name: "RejectsNegativeStringLimit", + spec: ` +name: retryable-dag +retry_policy: + limit: "-1" +steps: + - command: echo hi `, wantErr: "retry_policy", }, @@ -423,6 +465,30 @@ retry_policy: interval_sec: later steps: - command: echo hi +`, + wantErr: "retry_policy", + }, + { + name: "RejectsZeroInterval", + spec: ` +name: retryable-dag +retry_policy: + limit: 1 + interval_sec: 0 +steps: + - command: echo hi +`, + wantErr: "retry_policy", + }, + { + name: "RejectsZeroMaxInterval", + spec: ` +name: retryable-dag +retry_policy: + limit: 1 + max_interval_sec: 0 +steps: + - command: echo hi `, wantErr: "retry_policy", }, diff --git a/internal/core/exec/runstatus_test.go b/internal/core/exec/runstatus_test.go index 9a629317f..1961342bb 100644 --- a/internal/core/exec/runstatus_test.go +++ b/internal/core/exec/runstatus_test.go @@ -39,6 +39,31 @@ func TestInitialStatusSnapshotsDAGRetryMetadata(t *testing.T) { assert.Equal(t, "retry-dag", status.SuspendFlagName) } +func TestInitialStatusSnapshotsDisabledDAGRetryPolicy(t *testing.T) { + t.Parallel() + + dag := &core.DAG{ + Name: "retry-disabled-dag", + Queue: "shared-queue", + Location: "/tmp/retry-disabled-dag.yaml", + RetryPolicy: &core.DAGRetryPolicy{ + Limit: 0, + Interval: time.Minute, + Backoff: 0, + MaxInterval: time.Hour, + }, + } + + status := exec.InitialStatus(dag) + + assert.Equal(t, 0, status.AutoRetryLimit) + assert.Equal(t, time.Minute, status.AutoRetryInterval) + assert.Equal(t, 0.0, status.AutoRetryBackoff) + assert.Equal(t, time.Hour, status.AutoRetryMaxInterval) + assert.Equal(t, "shared-queue", status.ProcGroup) + assert.Equal(t, "retry-disabled-dag", status.SuspendFlagName) +} + func TestPendingStepRetriesFromStatus(t *testing.T) { t.Parallel() diff --git a/internal/core/spec/dag.go b/internal/core/spec/dag.go index eb76f9f5f..3ad38c87a 100644 --- a/internal/core/spec/dag.go +++ b/internal/core/spec/dag.go @@ -1000,7 +1000,7 @@ func parseDAGRetryInterval(v any) (time.Duration, string, error) { if v == nil { return 60 * time.Second, "", nil } - interval, intervalStr, err := parseConcreteDAGRetryInt("retry_policy.interval_sec", v) + interval, intervalStr, err := parseConcreteDAGRetryInt("retry_policy.interval_sec", v, false) if err != nil { return 0, "", err } @@ -1019,7 +1019,7 @@ func parseDAGRetryMaxInterval(v any) (time.Duration, error) { if v == nil { return time.Hour, nil } - seconds, _, err := parseConcreteDAGRetryInt("retry_policy.max_interval_sec", v) + seconds, _, err := parseConcreteDAGRetryInt("retry_policy.max_interval_sec", v, false) if err != nil { return 0, err } @@ -1030,31 +1030,39 @@ func parseDAGRetryLimit(v any) (int, error) { if v == nil { return 0, core.NewValidationError("retry_policy.limit", nil, fmt.Errorf("limit is required when retry_policy is specified")) } - limit, _, err := parseConcreteDAGRetryInt("retry_policy.limit", v) + limit, _, err := parseConcreteDAGRetryInt("retry_policy.limit", v, true) if err != nil { return 0, err } return limit, nil } -func parseConcreteDAGRetryInt(fieldName string, val any) (int, string, error) { +func parseConcreteDAGRetryInt(fieldName string, val any, allowZero bool) (int, string, error) { + invalidPositiveValue := func(value any) (int, string, error) { + operator := "> 0" + if allowZero { + operator = ">= 0" + } + return 0, "", core.NewValidationError(fieldName, value, fmt.Errorf("%s must be %s", retryFieldLabel(fieldName), operator)) + } + switch v := val.(type) { case int: - if v <= 0 { - return 0, "", core.NewValidationError(fieldName, v, fmt.Errorf("%s must be > 0", retryFieldLabel(fieldName))) + if v < 0 || (!allowZero && v == 0) { + return invalidPositiveValue(v) } return v, "", nil case int64: - if v <= 0 { - return 0, "", core.NewValidationError(fieldName, v, fmt.Errorf("%s must be > 0", retryFieldLabel(fieldName))) + if v < 0 || (!allowZero && v == 0) { + return invalidPositiveValue(v) } if v > math.MaxInt { return 0, "", core.NewValidationError(fieldName, v, fmt.Errorf("value %d exceeds maximum int", v)) } return int(v), "", nil case uint64: - if v == 0 { - return 0, "", core.NewValidationError(fieldName, v, fmt.Errorf("%s must be > 0", retryFieldLabel(fieldName))) + if !allowZero && v == 0 { + return invalidPositiveValue(v) } if v > math.MaxInt { return 0, "", core.NewValidationError(fieldName, v, fmt.Errorf("value %d exceeds maximum int", v)) @@ -1065,8 +1073,8 @@ func parseConcreteDAGRetryInt(fieldName string, val any) (int, string, error) { if err != nil { return 0, "", core.NewValidationError(fieldName, v, fmt.Errorf("%s must be an integer or numeric string", retryFieldLabel(fieldName))) } - if parsed <= 0 { - return 0, "", core.NewValidationError(fieldName, v, fmt.Errorf("%s must be > 0", retryFieldLabel(fieldName))) + if parsed < 0 || (!allowZero && parsed == 0) { + return invalidPositiveValue(v) } return parsed, v, nil default: diff --git a/internal/core/spec/loader.go b/internal/core/spec/loader.go index b202a8f8c..66faab021 100644 --- a/internal/core/spec/loader.go +++ b/internal/core/spec/loader.go @@ -737,6 +737,18 @@ func (*mergeTransformer) Transformer( } } + if typ == reflect.TypeFor[core.DAGRetryPolicy]() { + // DAG retry policies are configured as a single root object. Replace the + // inherited policy wholesale so limit: 0 can intentionally disable retries. + return func(dst, src reflect.Value) error { + if dst.CanSet() { + dst.Set(src) + } + + return nil + } + } + if typ == reflect.TypeFor[core.KubernetesConfig]() { return func(dst, src reflect.Value) error { if !dst.CanSet() || !src.IsValid() || src.IsNil() { diff --git a/internal/core/spec/loader_test.go b/internal/core/spec/loader_test.go index 52fc86abc..cf759e43f 100644 --- a/internal/core/spec/loader_test.go +++ b/internal/core/spec/loader_test.go @@ -1948,6 +1948,30 @@ steps: require.Equal(t, 60*time.Second, dag.RetryPolicy.Interval) }) + t.Run("BaseConfigDAGRetryPolicyCanBeDisabledByChild", func(t *testing.T) { + t.Parallel() + + base := createTempYAMLFile(t, ` +retry_policy: + limit: 3 + interval_sec: 30 + max_interval_sec: 300 +`) + child := createTempYAMLFile(t, ` +retry_policy: + limit: 0 +steps: + - name: step1 + command: echo "test" +`) + dag, err := spec.Load(context.Background(), child, spec.WithBaseConfig(base)) + require.NoError(t, err) + require.NotNil(t, dag.RetryPolicy) + require.Equal(t, 0, dag.RetryPolicy.Limit) + require.Equal(t, 60*time.Second, dag.RetryPolicy.Interval) + require.Equal(t, time.Hour, dag.RetryPolicy.MaxInterval) + }) + t.Run("DAGRetryPolicyNormalization", func(t *testing.T) { t.Parallel() @@ -1977,6 +2001,38 @@ steps: MaxInterval: 300 * time.Second, }, }, + { + name: "LimitZeroDefaultsRetryIntervals", + spec: ` +name: retryable +retry_policy: + limit: 0 +steps: + - command: echo hi +`, + wantPolicy: &core.DAGRetryPolicy{ + Limit: 0, + Interval: 60 * time.Second, + Backoff: 0, + MaxInterval: time.Hour, + }, + }, + { + name: "StringLimitZeroDefaultsRetryIntervals", + spec: ` +name: retryable +retry_policy: + limit: "0" +steps: + - command: echo hi +`, + wantPolicy: &core.DAGRetryPolicy{ + Limit: 0, + Interval: 60 * time.Second, + Backoff: 0, + MaxInterval: time.Hour, + }, + }, { name: "BackoffFalseUsesFixedInterval", spec: ` @@ -2004,6 +2060,28 @@ retry_policy: limit: three steps: - command: echo hi +`, + errContains: "retry_policy.limit", + }, + { + name: "RejectsNegativeLimit", + spec: ` +name: retryable +retry_policy: + limit: -1 +steps: + - command: echo hi +`, + errContains: "retry_policy.limit", + }, + { + name: "RejectsNegativeStringLimit", + spec: ` +name: retryable +retry_policy: + limit: "-1" +steps: + - command: echo hi `, errContains: "retry_policy.limit", }, @@ -2016,6 +2094,30 @@ retry_policy: interval_sec: later steps: - command: echo hi +`, + errContains: "retry_policy.interval_sec", + }, + { + name: "RejectsZeroInterval", + spec: ` +name: retryable +retry_policy: + limit: 1 + interval_sec: 0 +steps: + - command: echo hi +`, + errContains: "retry_policy.interval_sec", + }, + { + name: "RejectsNegativeInterval", + spec: ` +name: retryable +retry_policy: + limit: 1 + interval_sec: -1 +steps: + - command: echo hi `, errContains: "retry_policy.interval_sec", }, @@ -2032,6 +2134,30 @@ steps: `, errContains: "retry_policy.backoff", }, + { + name: "RejectsZeroMaxInterval", + spec: ` +name: retryable +retry_policy: + limit: 1 + max_interval_sec: 0 +steps: + - command: echo hi +`, + errContains: "retry_policy.max_interval_sec", + }, + { + name: "RejectsNegativeMaxInterval", + spec: ` +name: retryable +retry_policy: + limit: 1 + max_interval_sec: -1 +steps: + - command: echo hi +`, + errContains: "retry_policy.max_interval_sec", + }, } for _, tt := range tests { diff --git a/internal/intg/queue/queue_test.go b/internal/intg/queue/queue_test.go index 628d420a7..0c50cb2e7 100644 --- a/internal/intg/queue/queue_test.go +++ b/internal/intg/queue/queue_test.go @@ -16,6 +16,7 @@ import ( "github.com/dagucloud/dagu/internal/cmn/stringutil" "github.com/dagucloud/dagu/internal/core" "github.com/dagucloud/dagu/internal/core/exec" + "github.com/dagucloud/dagu/internal/core/spec" "github.com/dagucloud/dagu/internal/service/coordinator" "github.com/dagucloud/dagu/internal/service/scheduler" "github.com/dagucloud/dagu/internal/test" @@ -342,6 +343,67 @@ steps: assert.Equal(t, 1, latest.AutoRetryCount) }) + t.Run("DisabledByChildSkipsInheritedBaseRetryPolicy", func(t *testing.T) { + f := newFixture(t, ` +type: graph +name: retry-disabled-dag +queue: retry-disabled-queue +retry_policy: + limit: 0 +steps: + - id: retry_step + command: echo retried +`, WithQueue("retry-disabled-queue"), WithGlobalQueue("retry-disabled-queue", 1)) + + require.NoError(t, os.WriteFile(f.th.Config.Paths.BaseConfig, []byte(` +retry_policy: + limit: 1 + interval_sec: 1 + backoff: false + max_interval_sec: 1 +`), 0600)) + + dag, err := spec.Load(f.th.Context, f.dag.Location, spec.WithBaseConfig(f.th.Config.Paths.BaseConfig)) + require.NoError(t, err) + f.dag = dag + require.NotNil(t, f.dag.RetryPolicy) + require.Equal(t, 0, f.dag.RetryPolicy.Limit) + + failedAt := time.Now().UTC().Add(-30 * time.Second) + runID := f.FailedRunWithMetadata(runStatusOptions{ + StartedAt: failedAt.Add(-5 * time.Second), + FinishedAt: failedAt, + ScheduleTime: failedAt.Add(-time.Minute), + TriggerType: core.TriggerTypeScheduler, + }) + originalStatus := f.MustStatus(runID) + originalAttemptID := originalStatus.AttemptID + require.Equal(t, 0, originalStatus.AutoRetryLimit) + + f.StartScheduler(10 * time.Second) + defer f.Stop() + + require.Never(t, func() bool { + status, err := f.Status(runID) + if err != nil { + return false + } + return status.AttemptID != originalAttemptID || + status.Status != core.Failed || + status.AutoRetryCount != 0 + }, 3*time.Second, 100*time.Millisecond) + + latest := f.MustStatus(runID) + assert.Equal(t, core.Failed, latest.Status) + assert.Equal(t, originalAttemptID, latest.AttemptID) + assert.Equal(t, 0, latest.AutoRetryCount) + assert.Equal(t, 0, latest.AutoRetryLimit) + + items, err := f.th.QueueStore.List(f.th.Context, "retry-disabled-queue") + require.NoError(t, err) + assert.Empty(t, items) + }) + t.Run("NewerScheduledRunDoesNotSuppressRetry", func(t *testing.T) { markerPath := filepath.Join(t.TempDir(), "failure.marker") f := newFixture(t, fmt.Sprintf(` diff --git a/internal/service/scheduler/retry_scanner_test.go b/internal/service/scheduler/retry_scanner_test.go index e4654199d..7496a5dc9 100644 --- a/internal/service/scheduler/retry_scanner_test.go +++ b/internal/service/scheduler/retry_scanner_test.go @@ -50,6 +50,16 @@ func TestRetryScannerEvaluateRetryDecision(t *testing.T) { metadata: mustRetryMetadataFromDAG(t, baseDAG), reason: "retry_exhausted", }, + { + name: "RetryLimitZeroSkips", + status: cloneRetryStatus(baseStatus), + metadata: dagRetryMetadata{ + limit: 0, + interval: time.Minute, + maxInterval: 10 * time.Minute, + }, + reason: "retry_policy_missing", + }, { name: "MissingFinishedAtFallsBackToCreatedAt", status: withCreatedAt(withFinishedAt(baseStatus, ""), now.Add(-2*time.Minute).UnixMilli()), @@ -228,6 +238,54 @@ func TestRetryScannerScanEnqueuesRetry(t *testing.T) { queueStore.AssertExpectations(t) } +func TestRetryScannerScanSkipsDisabledRetryPolicy(t *testing.T) { + t.Parallel() + + now := time.Date(2026, 3, 14, 14, 0, 0, 0, time.UTC) + dag := &core.DAG{ + Name: "retry-disabled-dag", + Location: "/tmp/retry-disabled-dag.yaml", + RetryPolicy: &core.DAGRetryPolicy{ + Limit: 0, + Interval: time.Minute, + Backoff: 0, + MaxInterval: 10 * time.Minute, + }, + } + status := &exec.DAGRunStatus{ + Name: dag.Name, + DAGRunID: "run-1", + AttemptID: "att-1", + Status: core.Failed, + AutoRetryCount: 0, + FinishedAt: now.Add(-3 * time.Minute).Format(time.RFC3339), + ScheduleTime: now.Add(-10 * time.Minute).Format(time.RFC3339), + } + store := newRetryScannerStore(dag, status) + queueStore := &exec.MockQueueStore{} + + scanner, err := NewRetryScanner( + store, + queueStore, + nil, + 24*time.Hour, + func() time.Time { return now }, + ) + require.NoError(t, err) + + err = scanner.scan(context.Background()) + require.NoError(t, err) + + latest := store.mustStatus(status.DAGRun()) + assert.Equal(t, core.Failed, latest.Status) + assert.Equal(t, 0, latest.AutoRetryCount) + assert.Equal(t, 0, latest.AutoRetryLimit) + assert.Equal(t, 0, store.latestAttemptCalls) + assert.Len(t, store.listCalls, 1) + assert.Equal(t, 0, store.findAttemptCalls) + queueStore.AssertNotCalled(t, "Enqueue", mock.Anything, dag.ProcGroup(), exec.QueuePriorityLow, status.DAGRun()) +} + func TestRetryScannerScanEnqueuesRetryWithoutLiveTargets(t *testing.T) { t.Parallel() diff --git a/skills/dagu/references/schema.md b/skills/dagu/references/schema.md index e27068947..1f159d8d9 100644 --- a/skills/dagu/references/schema.md +++ b/skills/dagu/references/schema.md @@ -33,6 +33,7 @@ | `hist_retention_days` | int | 30 | History retention | | `queue` | string | — | Queue name for concurrency control. Define queues in global config with `max_concurrency`. | | `preconditions` | array | — | DAG-level preconditions (`condition`, `expected`, `negate`) | +| `retry_policy` | object | — | DAG-level automatic retry policy: `{limit, interval_sec, backoff, max_interval_sec}`. `limit: 0` disables automatic DAG retries. | | `handler_on` | object | — | Event handlers: `init`, `success`, `failure`, `abort`, `exit`, `wait` (each is a step definition) | | `smtp` | object | — | SMTP config: `host`, `port`, `username`, `password` | | `error_mail` | object | — | Error mail: `from`, `to`, `prefix`, `attach_logs` | @@ -58,6 +59,7 @@ - `params:` values are resolved as strings. - `handler_on` keys are exactly `init`, `success`, `failure`, `abort`, `exit`, and `wait`. - `container:` is polymorphic: a string targets an existing container, while an object creates a new one. +- Top-level `retry_policy.limit: 0` disables DAG-level automatic retries. ## Step-Level Fields