-
Notifications
You must be signed in to change notification settings - Fork 4.8k
resourcewatch: harden watch loop to prevent thread exhaustion #30944
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,7 +2,9 @@ package observe | |
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "fmt" | ||
| "math/rand" | ||
| "time" | ||
|
|
||
| "github.com/go-logr/logr" | ||
|
|
@@ -20,13 +22,26 @@ type resourceMeta struct { | |
| lastObserved *unstructured.Unstructured | ||
| } | ||
|
|
||
| var ( | ||
| errWatchClosed = errors.New("resource watch closed") | ||
| errWatchErrorEvent = errors.New("resource watch error event") | ||
| errUnexpectedObject = errors.New("unexpected watch object type") | ||
| ) | ||
|
|
||
| const ( | ||
| notFoundRetryDelay = 5 * time.Second | ||
| minRetryDelay = 500 * time.Millisecond | ||
| maxRetryDelay = 30 * time.Second | ||
| ) | ||
|
|
||
| // ObserveResource monitors a Kubernetes resource for changes | ||
| func ObserveResource(ctx context.Context, log logr.Logger, client *dynamic.DynamicClient, gvr schema.GroupVersionResource, resourceC chan<- *ResourceObservation) { | ||
| log = log.WithName("ObserveResource").WithValues("group", gvr.Group, "version", gvr.Version, "resource", gvr.Resource) | ||
|
|
||
| resourceClient := client.Resource(gvr) | ||
|
|
||
| observedResources := make(map[types.UID]*resourceMeta) | ||
| retryAttempt := 0 | ||
|
|
||
| for { | ||
| select { | ||
|
|
@@ -36,8 +51,81 @@ func ObserveResource(ctx context.Context, log logr.Logger, client *dynamic.Dynam | |
| } | ||
|
|
||
| if err := listAndWatchResource(ctx, log, resourceClient, gvr, observedResources, resourceC); err != nil { | ||
| log.Error(err, "failed to list and watch resource") | ||
| if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { | ||
| return | ||
| } | ||
|
|
||
| retryDelay := nextRetryDelay(err, retryAttempt) | ||
| log.Error(err, "failed to list and watch resource", "retryReason", retryReason(err), "retryDelay", retryDelay) | ||
|
|
||
| if !waitForRetry(ctx, retryDelay) { | ||
| return | ||
| } | ||
|
|
||
| retryAttempt++ | ||
| continue | ||
| } | ||
|
|
||
| // If a watch cycle ends cleanly, start retries from the base delay. | ||
| retryAttempt = 0 | ||
|
Comment on lines
+58
to
+70
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The increment at Line 65 runs on every failure, but the reset at Line 70 is unreachable now that 🤖 Prompt for AI Agents |
||
| } | ||
| } | ||
|
|
||
| func nextRetryDelay(err error, retryAttempt int) time.Duration { | ||
| if apierrors.IsNotFound(err) { | ||
| return notFoundRetryDelay | ||
| } | ||
|
|
||
| backoff := minRetryDelay | ||
| for i := 0; i < retryAttempt; i++ { | ||
| if backoff >= maxRetryDelay/2 { | ||
| backoff = maxRetryDelay | ||
| break | ||
| } | ||
| backoff *= 2 | ||
| } | ||
| if backoff > maxRetryDelay { | ||
| backoff = maxRetryDelay | ||
| } | ||
|
|
||
| jitter := backoff / 4 | ||
| if jitter > 0 { | ||
| jitterDelta := time.Duration(rand.Int63n(int64(2*jitter)+1)) - jitter | ||
| backoff += jitterDelta | ||
| } | ||
| if backoff < minRetryDelay { | ||
| backoff = minRetryDelay | ||
| } | ||
| if backoff > maxRetryDelay { | ||
| backoff = maxRetryDelay | ||
| } | ||
| return backoff | ||
| } | ||
|
|
||
| func waitForRetry(ctx context.Context, delay time.Duration) bool { | ||
| timer := time.NewTimer(delay) | ||
| defer timer.Stop() | ||
|
|
||
| select { | ||
| case <-ctx.Done(): | ||
| return false | ||
| case <-timer.C: | ||
| return true | ||
| } | ||
| } | ||
|
|
||
| func retryReason(err error) string { | ||
| switch { | ||
| case apierrors.IsNotFound(err): | ||
| return "listNotFound" | ||
| case errors.Is(err, errWatchClosed): | ||
| return "watchClosed" | ||
| case errors.Is(err, errWatchErrorEvent): | ||
| return "watchError" | ||
| case errors.Is(err, errUnexpectedObject): | ||
| return "decodeError" | ||
| default: | ||
| return "listOrWatchError" | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -46,12 +134,9 @@ func listAndWatchResource(ctx context.Context, log logr.Logger, client dynamic.N | |
| if err != nil { | ||
| // List returns a NotFound error if the resource doesn't exist. We | ||
| // expect this to happen during cluster installation before CRDs are | ||
| // admitted. Poll at 5 second intervals if this happens to avoid | ||
| // spamming api-server or the logs. | ||
| // admitted. | ||
| if apierrors.IsNotFound(err) { | ||
| log.Info("Resource not found, polling") | ||
| time.Sleep(5 * time.Second) | ||
| return nil | ||
| log.Info("Resource not found") | ||
| } | ||
| return err | ||
| } | ||
|
|
@@ -62,6 +147,7 @@ func listAndWatchResource(ctx context.Context, log logr.Logger, client dynamic.N | |
| if err != nil { | ||
| return fmt.Errorf("failed to watch resource: %w", err) | ||
| } | ||
| defer resourceWatch.Stop() | ||
|
|
||
| resultChan := resourceWatch.ResultChan() | ||
| for { | ||
|
|
@@ -70,23 +156,36 @@ func listAndWatchResource(ctx context.Context, log logr.Logger, client dynamic.N | |
| return ctx.Err() | ||
| case observation, ok := <-resultChan: | ||
| if !ok { | ||
| log.Info("Resource watch closed") | ||
| return nil | ||
| return errWatchClosed | ||
| } | ||
|
|
||
| switch observation.Type { | ||
| case watch.Bookmark: | ||
| // Bookmarks are periodic progress notifications; no state change to emit. | ||
| continue | ||
| case watch.Error: | ||
| status, ok := observation.Object.(*metav1.Status) | ||
| if !ok { | ||
| return fmt.Errorf("%w: %T", errWatchErrorEvent, observation.Object) | ||
| } | ||
| return fmt.Errorf("%w: reason=%s message=%s", errWatchErrorEvent, status.Reason, status.Message) | ||
| case watch.Added, watch.Modified, watch.Deleted: | ||
| // handled below | ||
| default: | ||
| log.Info("Unhandled watch event", "type", observation.Type) | ||
| continue | ||
| } | ||
|
|
||
| object, ok := observation.Object.(*unstructured.Unstructured) | ||
| if !ok { | ||
| return fmt.Errorf("failed to cast observation object to unstructured: %T", observation.Object) | ||
| return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object) | ||
| } | ||
|
|
||
| switch observation.Type { | ||
| case watch.Added: | ||
| case watch.Modified: | ||
| case watch.Added, watch.Modified: | ||
| emitUpdate(observedResources, gvr, object, resourceC) | ||
| case watch.Deleted: | ||
| emitDelete(observedResources, gvr, object, resourceC) | ||
| default: | ||
| log.Info("Unhandled watch event", "type", observation.Type) | ||
| } | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't retry terminal decode/contract failures forever.
This loop now backs off and retries every non-context error, including malformed watch payloads. Those won't self-heal, so the observer goroutine never returns;
pkg/resourcewatch/observe/source.go:113-131only closesfinishedafter every observer exits. Keep retries for transient list/watch failures, but treat decode/contract errors as terminal.Possible fix
if err := listAndWatchResource(ctx, log, resourceClient, gvr, observedResources, resourceC); err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return } + if errors.Is(err, errUnexpectedObject) { + log.Error(err, "terminal resource watch failure") + return + } retryDelay := nextRetryDelay(err, retryAttempt) log.Error(err, "failed to list and watch resource", "retryReason", retryReason(err), "retryDelay", retryDelay)case watch.Error: status, ok := observation.Object.(*metav1.Status) if !ok { - return fmt.Errorf("%w: %T", errWatchErrorEvent, observation.Object) + return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object) }Also applies to: 166-170, 179-181
🤖 Prompt for AI Agents