Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 112 additions & 13 deletions pkg/resourcewatch/observe/observe.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package observe

import (
"context"
"errors"
"fmt"
"math/rand"
"time"

"github.com/go-logr/logr"
Expand All @@ -20,13 +22,26 @@ type resourceMeta struct {
lastObserved *unstructured.Unstructured
}

var (
errWatchClosed = errors.New("resource watch closed")
errWatchErrorEvent = errors.New("resource watch error event")
errUnexpectedObject = errors.New("unexpected watch object type")
)

const (
notFoundRetryDelay = 5 * time.Second
minRetryDelay = 500 * time.Millisecond
maxRetryDelay = 30 * time.Second
)

// ObserveResource monitors a Kubernetes resource for changes
func ObserveResource(ctx context.Context, log logr.Logger, client *dynamic.DynamicClient, gvr schema.GroupVersionResource, resourceC chan<- *ResourceObservation) {
log = log.WithName("ObserveResource").WithValues("group", gvr.Group, "version", gvr.Version, "resource", gvr.Resource)

resourceClient := client.Resource(gvr)

observedResources := make(map[types.UID]*resourceMeta)
retryAttempt := 0

for {
select {
Expand All @@ -36,8 +51,81 @@ func ObserveResource(ctx context.Context, log logr.Logger, client *dynamic.Dynam
}

if err := listAndWatchResource(ctx, log, resourceClient, gvr, observedResources, resourceC); err != nil {
log.Error(err, "failed to list and watch resource")
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}

retryDelay := nextRetryDelay(err, retryAttempt)
log.Error(err, "failed to list and watch resource", "retryReason", retryReason(err), "retryDelay", retryDelay)

if !waitForRetry(ctx, retryDelay) {
return
}

retryAttempt++
continue
Comment on lines 53 to +66
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't retry terminal decode/contract failures forever.

This loop now backs off and retries every non-context error, including malformed watch payloads. Those won't self-heal, so the observer goroutine never returns; pkg/resourcewatch/observe/source.go:113-131 only closes finished after every observer exits. Keep retries for transient list/watch failures, but treat decode/contract errors as terminal.

Possible fix
 		if err := listAndWatchResource(ctx, log, resourceClient, gvr, observedResources, resourceC); err != nil {
 			if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
 				return
 			}
+			if errors.Is(err, errUnexpectedObject) {
+				log.Error(err, "terminal resource watch failure")
+				return
+			}

 			retryDelay := nextRetryDelay(err, retryAttempt)
 			log.Error(err, "failed to list and watch resource", "retryReason", retryReason(err), "retryDelay", retryDelay)
 			case watch.Error:
 				status, ok := observation.Object.(*metav1.Status)
 				if !ok {
-					return fmt.Errorf("%w: %T", errWatchErrorEvent, observation.Object)
+					return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object)
 				}

Also applies to: 166-170, 179-181

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/resourcewatch/observe/observe.go` around lines 53 - 66, The loop around
listAndWatchResource currently retries on all non-context errors; change it to
treat decode/contract failures as terminal by detecting those via
retryReason(err) (or the specific error types your code uses for decode/contract
failures) and returning immediately instead of backing off and retrying.
Concretely, after the listAndWatchResource error check but before computing
nextRetryDelay/retrying, add a branch: if retryReason(err) indicates a
decode/contract failure (or errors.Is(err, ErrDecode) / errors.Is(err,
ErrContract)), log the terminal error and return; otherwise continue with
nextRetryDelay, waitForRetry, and retryAttempt increment as before. Apply the
same change to the other retry loops in this file that use
nextRetryDelay/retryAttempt (the other list/watch retry sites using
retryReason).

}

// If a watch cycle ends cleanly, start retries from the base delay.
retryAttempt = 0
Comment on lines +58 to +70
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

retryAttempt never resets with the current control flow.

The increment at Line 65 runs on every failure, but the reset at Line 70 is unreachable now that listAndWatchResource only returns errors. After enough reconnects, later watch renewals stay pinned near maxRetryDelay even after long healthy periods.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/resourcewatch/observe/observe.go` around lines 58 - 70, The loop never
resets retryAttempt because listAndWatchResource currently only returns errors,
so the retryAttempt++ on error keeps growing; change the control flow so that
when listAndWatchResource finishes a clean watch cycle it returns nil (or
another success signal) and the caller sets retryAttempt = 0 before continuing;
keep the existing logic that increments retryAttempt only on error paths (when
err != nil), call nextRetryDelay/retryReason/waitForRetry only for errors, and
ensure waitForRetry(false) still returns early—update listAndWatchResource and
the loop around it (symbols: listAndWatchResource, retryAttempt, nextRetryDelay,
retryReason, waitForRetry) accordingly so healthy long-running periods reset the
retry back to base delay.

}
}

func nextRetryDelay(err error, retryAttempt int) time.Duration {
if apierrors.IsNotFound(err) {
return notFoundRetryDelay
}

backoff := minRetryDelay
for i := 0; i < retryAttempt; i++ {
if backoff >= maxRetryDelay/2 {
backoff = maxRetryDelay
break
}
backoff *= 2
}
if backoff > maxRetryDelay {
backoff = maxRetryDelay
}

jitter := backoff / 4
if jitter > 0 {
jitterDelta := time.Duration(rand.Int63n(int64(2*jitter)+1)) - jitter
backoff += jitterDelta
}
if backoff < minRetryDelay {
backoff = minRetryDelay
}
if backoff > maxRetryDelay {
backoff = maxRetryDelay
}
return backoff
}

func waitForRetry(ctx context.Context, delay time.Duration) bool {
timer := time.NewTimer(delay)
defer timer.Stop()

select {
case <-ctx.Done():
return false
case <-timer.C:
return true
}
}

func retryReason(err error) string {
switch {
case apierrors.IsNotFound(err):
return "listNotFound"
case errors.Is(err, errWatchClosed):
return "watchClosed"
case errors.Is(err, errWatchErrorEvent):
return "watchError"
case errors.Is(err, errUnexpectedObject):
return "decodeError"
default:
return "listOrWatchError"
}
}

Expand All @@ -46,12 +134,9 @@ func listAndWatchResource(ctx context.Context, log logr.Logger, client dynamic.N
if err != nil {
// List returns a NotFound error if the resource doesn't exist. We
// expect this to happen during cluster installation before CRDs are
// admitted. Poll at 5 second intervals if this happens to avoid
// spamming api-server or the logs.
// admitted.
if apierrors.IsNotFound(err) {
log.Info("Resource not found, polling")
time.Sleep(5 * time.Second)
return nil
log.Info("Resource not found")
}
return err
}
Expand All @@ -62,6 +147,7 @@ func listAndWatchResource(ctx context.Context, log logr.Logger, client dynamic.N
if err != nil {
return fmt.Errorf("failed to watch resource: %w", err)
}
defer resourceWatch.Stop()

resultChan := resourceWatch.ResultChan()
for {
Expand All @@ -70,23 +156,36 @@ func listAndWatchResource(ctx context.Context, log logr.Logger, client dynamic.N
return ctx.Err()
case observation, ok := <-resultChan:
if !ok {
log.Info("Resource watch closed")
return nil
return errWatchClosed
}

switch observation.Type {
case watch.Bookmark:
// Bookmarks are periodic progress notifications; no state change to emit.
continue
case watch.Error:
status, ok := observation.Object.(*metav1.Status)
if !ok {
return fmt.Errorf("%w: %T", errWatchErrorEvent, observation.Object)
}
return fmt.Errorf("%w: reason=%s message=%s", errWatchErrorEvent, status.Reason, status.Message)
case watch.Added, watch.Modified, watch.Deleted:
// handled below
default:
log.Info("Unhandled watch event", "type", observation.Type)
continue
}

object, ok := observation.Object.(*unstructured.Unstructured)
if !ok {
return fmt.Errorf("failed to cast observation object to unstructured: %T", observation.Object)
return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object)
}

switch observation.Type {
case watch.Added:
case watch.Modified:
case watch.Added, watch.Modified:
emitUpdate(observedResources, gvr, object, resourceC)
case watch.Deleted:
emitDelete(observedResources, gvr, object, resourceC)
default:
log.Info("Unhandled watch event", "type", observation.Type)
}
}
}
Expand Down
Loading