Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 106 additions & 13 deletions pkg/resourcewatch/observe/observe.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package observe

import (
"context"
"errors"
"fmt"
"math/rand/v2"
"time"

"github.com/go-logr/logr"
Expand All @@ -20,13 +22,26 @@ type resourceMeta struct {
lastObserved *unstructured.Unstructured
}

var (
errWatchClosed = errors.New("resource watch closed")
errWatchErrorEvent = errors.New("resource watch error event")
errUnexpectedObject = errors.New("unexpected watch object type")
)

const (
notFoundRetryDelay = 5 * time.Second
minRetryDelay = 500 * time.Millisecond
maxRetryDelay = 30 * time.Second
)

// ObserveResource monitors a Kubernetes resource for changes
func ObserveResource(ctx context.Context, log logr.Logger, client *dynamic.DynamicClient, gvr schema.GroupVersionResource, resourceC chan<- *ResourceObservation) {
log = log.WithName("ObserveResource").WithValues("group", gvr.Group, "version", gvr.Version, "resource", gvr.Resource)

resourceClient := client.Resource(gvr)

observedResources := make(map[types.UID]*resourceMeta)
retryAttempt := 0

for {
select {
Expand All @@ -36,8 +51,73 @@ func ObserveResource(ctx context.Context, log logr.Logger, client *dynamic.Dynam
}

if err := listAndWatchResource(ctx, log, resourceClient, gvr, observedResources, resourceC); err != nil {
log.Error(err, "failed to list and watch resource")
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
if errors.Is(err, errUnexpectedObject) {
log.Error(err, "terminal resource watch failure")
return
}

retryDelay := nextRetryDelay(err, retryAttempt)
log.Error(err, "failed to list and watch resource", "retryReason", retryReason(err), "retryDelay", retryDelay)

if !waitForRetry(ctx, retryDelay) {
return
}

retryAttempt++
continue
}

// If a watch cycle ends cleanly, start retries from the base delay.
retryAttempt = 0
}
}

func nextRetryDelay(err error, retryAttempt int) time.Duration {
if apierrors.IsNotFound(err) {
return notFoundRetryDelay
}

backoff := minRetryDelay
for range retryAttempt {
backoff = min(backoff*2, maxRetryDelay)
}

// Apply ±25% jitter, then clamp to [minRetryDelay, maxRetryDelay]
jitter := backoff / 4
if jitter > 0 {
jitterDelta := time.Duration(rand.N(int64(2*jitter)+1)) - jitter
backoff += jitterDelta
}
return max(min(backoff, maxRetryDelay), minRetryDelay)
}

func waitForRetry(ctx context.Context, delay time.Duration) bool {
timer := time.NewTimer(delay)
defer timer.Stop()

select {
case <-ctx.Done():
return false
case <-timer.C:
return true
}
}

func retryReason(err error) string {
switch {
case apierrors.IsNotFound(err):
return "listNotFound"
case errors.Is(err, errWatchClosed):
return "watchClosed"
case errors.Is(err, errWatchErrorEvent):
return "watchError"
case errors.Is(err, errUnexpectedObject):
return "decodeError"
default:
return "listOrWatchError"
}
}

Expand All @@ -46,12 +126,9 @@ func listAndWatchResource(ctx context.Context, log logr.Logger, client dynamic.N
if err != nil {
// List returns a NotFound error if the resource doesn't exist. We
// expect this to happen during cluster installation before CRDs are
// admitted. Poll at 5 second intervals if this happens to avoid
// spamming api-server or the logs.
// admitted.
if apierrors.IsNotFound(err) {
log.Info("Resource not found, polling")
time.Sleep(5 * time.Second)
return nil
log.Info("Resource not found")
}
return err
}
Expand All @@ -62,6 +139,7 @@ func listAndWatchResource(ctx context.Context, log logr.Logger, client dynamic.N
if err != nil {
return fmt.Errorf("failed to watch resource: %w", err)
}
defer resourceWatch.Stop()

resultChan := resourceWatch.ResultChan()
for {
Expand All @@ -70,23 +148,38 @@ func listAndWatchResource(ctx context.Context, log logr.Logger, client dynamic.N
return ctx.Err()
case observation, ok := <-resultChan:
if !ok {
log.Info("Resource watch closed")
return nil
// Watch channel closed (e.g. watch expired); caller will re-list with backoff.
log.Info("Watch channel closed, will retry")
return errWatchClosed
}

switch observation.Type {
case watch.Bookmark:
// Bookmarks are periodic progress notifications; no state change to emit.
continue
case watch.Error:
status, ok := observation.Object.(*metav1.Status)
if !ok {
return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object)
}
return fmt.Errorf("%w: reason=%s message=%s", errWatchErrorEvent, status.Reason, status.Message)
case watch.Added, watch.Modified, watch.Deleted:
// handled below
default:
log.Info("Unhandled watch event", "type", observation.Type)
continue
}

object, ok := observation.Object.(*unstructured.Unstructured)
if !ok {
return fmt.Errorf("failed to cast observation object to unstructured: %T", observation.Object)
return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object)
}

switch observation.Type {
case watch.Added:
case watch.Modified:
case watch.Added, watch.Modified:
emitUpdate(observedResources, gvr, object, resourceC)
case watch.Deleted:
emitDelete(observedResources, gvr, object, resourceC)
default:
log.Info("Unhandled watch event", "type", observation.Type)
}
}
}
Expand Down
Loading