diff --git a/pkg/resourcewatch/observe/observe.go b/pkg/resourcewatch/observe/observe.go index 54a99d0a6bd8..c64cfb935c24 100644 --- a/pkg/resourcewatch/observe/observe.go +++ b/pkg/resourcewatch/observe/observe.go @@ -2,7 +2,9 @@ package observe import ( "context" + "errors" "fmt" + "math/rand/v2" "time" "github.com/go-logr/logr" @@ -20,6 +22,18 @@ type resourceMeta struct { lastObserved *unstructured.Unstructured } +var ( + errWatchClosed = errors.New("resource watch closed") + errWatchErrorEvent = errors.New("resource watch error event") + errUnexpectedObject = errors.New("unexpected watch object type") +) + +const ( + notFoundRetryDelay = 5 * time.Second + minRetryDelay = 500 * time.Millisecond + maxRetryDelay = 30 * time.Second +) + // ObserveResource monitors a Kubernetes resource for changes func ObserveResource(ctx context.Context, log logr.Logger, client *dynamic.DynamicClient, gvr schema.GroupVersionResource, resourceC chan<- *ResourceObservation) { log = log.WithName("ObserveResource").WithValues("group", gvr.Group, "version", gvr.Version, "resource", gvr.Resource) @@ -27,6 +41,7 @@ func ObserveResource(ctx context.Context, log logr.Logger, client *dynamic.Dynam resourceClient := client.Resource(gvr) observedResources := make(map[types.UID]*resourceMeta) + retryAttempt := 0 for { select { @@ -36,8 +51,73 @@ func ObserveResource(ctx context.Context, log logr.Logger, client *dynamic.Dynam } if err := listAndWatchResource(ctx, log, resourceClient, gvr, observedResources, resourceC); err != nil { - log.Error(err, "failed to list and watch resource") + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return + } + if errors.Is(err, errUnexpectedObject) { + log.Error(err, "terminal resource watch failure") + return + } + + retryDelay := nextRetryDelay(err, retryAttempt) + log.Error(err, "failed to list and watch resource", "retryReason", retryReason(err), "retryDelay", retryDelay) + + if !waitForRetry(ctx, retryDelay) { + return + } + + retryAttempt++ + continue } + + // If a watch cycle ends cleanly, start retries from the base delay. + retryAttempt = 0 + } +} + +func nextRetryDelay(err error, retryAttempt int) time.Duration { + if apierrors.IsNotFound(err) { + return notFoundRetryDelay + } + + backoff := minRetryDelay + for range retryAttempt { + backoff = min(backoff*2, maxRetryDelay) + } + + // Apply ±25% jitter, then clamp to [minRetryDelay, maxRetryDelay] + jitter := backoff / 4 + if jitter > 0 { + jitterDelta := time.Duration(rand.N(int64(2*jitter)+1)) - jitter + backoff += jitterDelta + } + return max(min(backoff, maxRetryDelay), minRetryDelay) +} + +func waitForRetry(ctx context.Context, delay time.Duration) bool { + timer := time.NewTimer(delay) + defer timer.Stop() + + select { + case <-ctx.Done(): + return false + case <-timer.C: + return true + } +} + +func retryReason(err error) string { + switch { + case apierrors.IsNotFound(err): + return "listNotFound" + case errors.Is(err, errWatchClosed): + return "watchClosed" + case errors.Is(err, errWatchErrorEvent): + return "watchError" + case errors.Is(err, errUnexpectedObject): + return "decodeError" + default: + return "listOrWatchError" } } @@ -46,12 +126,9 @@ func listAndWatchResource(ctx context.Context, log logr.Logger, client dynamic.N if err != nil { // List returns a NotFound error if the resource doesn't exist. We // expect this to happen during cluster installation before CRDs are - // admitted. Poll at 5 second intervals if this happens to avoid - // spamming api-server or the logs. + // admitted. if apierrors.IsNotFound(err) { - log.Info("Resource not found, polling") - time.Sleep(5 * time.Second) - return nil + log.Info("Resource not found") } return err } @@ -62,6 +139,7 @@ func listAndWatchResource(ctx context.Context, log logr.Logger, client dynamic.N if err != nil { return fmt.Errorf("failed to watch resource: %w", err) } + defer resourceWatch.Stop() resultChan := resourceWatch.ResultChan() for { @@ -70,23 +148,38 @@ func listAndWatchResource(ctx context.Context, log logr.Logger, client dynamic.N return ctx.Err() case observation, ok := <-resultChan: if !ok { - log.Info("Resource watch closed") - return nil + // Watch channel closed (e.g. watch expired); caller will re-list with backoff. + log.Info("Watch channel closed, will retry") + return errWatchClosed + } + + switch observation.Type { + case watch.Bookmark: + // Bookmarks are periodic progress notifications; no state change to emit. + continue + case watch.Error: + status, ok := observation.Object.(*metav1.Status) + if !ok { + return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object) + } + return fmt.Errorf("%w: reason=%s message=%s", errWatchErrorEvent, status.Reason, status.Message) + case watch.Added, watch.Modified, watch.Deleted: + // handled below + default: + log.Info("Unhandled watch event", "type", observation.Type) + continue } object, ok := observation.Object.(*unstructured.Unstructured) if !ok { - return fmt.Errorf("failed to cast observation object to unstructured: %T", observation.Object) + return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object) } switch observation.Type { - case watch.Added: - case watch.Modified: + case watch.Added, watch.Modified: emitUpdate(observedResources, gvr, object, resourceC) case watch.Deleted: emitDelete(observedResources, gvr, object, resourceC) - default: - log.Info("Unhandled watch event", "type", observation.Type) } } } diff --git a/pkg/resourcewatch/observe/observe_test.go b/pkg/resourcewatch/observe/observe_test.go new file mode 100644 index 000000000000..fb222e2a81a1 --- /dev/null +++ b/pkg/resourcewatch/observe/observe_test.go @@ -0,0 +1,265 @@ +package observe + +import ( + "context" + "errors" + "strings" + "testing" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" + "k8s.io/klog/v2" +) + +type fakeNamespaceableResource struct { + listFn func(ctx context.Context, opts v1.ListOptions) (*unstructured.UnstructuredList, error) + watchFn func(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) +} + +func (f *fakeNamespaceableResource) Namespace(string) dynamic.ResourceInterface { + return f +} + +func (f *fakeNamespaceableResource) Create(context.Context, *unstructured.Unstructured, v1.CreateOptions, ...string) (*unstructured.Unstructured, error) { + panic("not implemented") +} + +func (f *fakeNamespaceableResource) Update(context.Context, *unstructured.Unstructured, v1.UpdateOptions, ...string) (*unstructured.Unstructured, error) { + panic("not implemented") +} + +func (f *fakeNamespaceableResource) UpdateStatus(context.Context, *unstructured.Unstructured, v1.UpdateOptions) (*unstructured.Unstructured, error) { + panic("not implemented") +} + +func (f *fakeNamespaceableResource) Delete(context.Context, string, v1.DeleteOptions, ...string) error { + panic("not implemented") +} + +func (f *fakeNamespaceableResource) DeleteCollection(context.Context, v1.DeleteOptions, v1.ListOptions) error { + panic("not implemented") +} + +func (f *fakeNamespaceableResource) Get(context.Context, string, v1.GetOptions, ...string) (*unstructured.Unstructured, error) { + panic("not implemented") +} + +func (f *fakeNamespaceableResource) List(ctx context.Context, opts v1.ListOptions) (*unstructured.UnstructuredList, error) { + return f.listFn(ctx, opts) +} + +func (f *fakeNamespaceableResource) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + return f.watchFn(ctx, opts) +} + +func (f *fakeNamespaceableResource) Patch(context.Context, string, types.PatchType, []byte, v1.PatchOptions, ...string) (*unstructured.Unstructured, error) { + panic("not implemented") +} + +func (f *fakeNamespaceableResource) Apply(context.Context, string, *unstructured.Unstructured, v1.ApplyOptions, ...string) (*unstructured.Unstructured, error) { + panic("not implemented") +} + +func (f *fakeNamespaceableResource) ApplyStatus(context.Context, string, *unstructured.Unstructured, v1.ApplyOptions) (*unstructured.Unstructured, error) { + panic("not implemented") +} + +type trackingWatch struct { + resultC chan watch.Event + stopped bool +} + +func newTrackingWatch() *trackingWatch { + return &trackingWatch{ + resultC: make(chan watch.Event, 4), + } +} + +func (w *trackingWatch) Stop() { + if w.stopped { + return + } + w.stopped = true + close(w.resultC) +} + +func (w *trackingWatch) ResultChan() <-chan watch.Event { + return w.resultC +} + +func TestListAndWatchResource_StopsWatchAndHandlesErrorEvent(t *testing.T) { + t.Parallel() + + resourceWatch := newTrackingWatch() + resourceWatch.resultC <- watch.Event{ + Type: watch.Error, + Object: &v1.Status{ + Reason: v1.StatusReasonExpired, + Message: "resource version too old", + }, + } + + client := &fakeNamespaceableResource{ + listFn: func(context.Context, v1.ListOptions) (*unstructured.UnstructuredList, error) { + return &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "metadata": map[string]interface{}{ + "resourceVersion": "123", + }, + }, + }, nil + }, + watchFn: func(context.Context, v1.ListOptions) (watch.Interface, error) { + return resourceWatch, nil + }, + } + + err := listAndWatchResource(context.Background(), klog.NewKlogr(), client, schema.GroupVersionResource{Resource: "pods"}, map[types.UID]*resourceMeta{}, make(chan *ResourceObservation, 8)) + if !errors.Is(err, errWatchErrorEvent) { + t.Fatalf("expected watch error event, got: %v", err) + } + if !strings.Contains(err.Error(), "resource version too old") { + t.Fatalf("expected status message in error, got: %v", err) + } + if !resourceWatch.stopped { + t.Fatalf("expected watch.Stop() to be called") + } +} + +func TestNextRetryDelay_BackoffAndNotFound(t *testing.T) { + t.Parallel() + + notFoundWrapped := apierrors.NewNotFound(schema.GroupResource{Group: "apps", Resource: "deployments"}, "example") + if got := nextRetryDelay(notFoundWrapped, 5); got != notFoundRetryDelay { + t.Fatalf("expected not found retry delay %v, got %v", notFoundRetryDelay, got) + } + + first := nextRetryDelay(errors.New("watch failed"), 0) + if first < minRetryDelay || first > minRetryDelay+minRetryDelay/2 { + t.Fatalf("attempt 0 delay out of range: %v", first) + } + + second := nextRetryDelay(errors.New("watch failed"), 1) + if second < minRetryDelay*2-minRetryDelay/2 || second > minRetryDelay*2+minRetryDelay/2 { + t.Fatalf("attempt 1 delay out of range: %v", second) + } + + maxed := nextRetryDelay(errors.New("watch failed"), 50) + if maxed < minRetryDelay || maxed > maxRetryDelay { + t.Fatalf("attempt 50 delay out of range [%v, %v]: %v", minRetryDelay, maxRetryDelay, maxed) + } +} + +func TestListAndWatchResource_AddedEventEmitsObservation(t *testing.T) { + t.Parallel() + + resourceWatch := newTrackingWatch() + resourceWatch.resultC <- watch.Event{ + Type: watch.Added, + Object: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "new-pod", + "uid": "uid-1234", + "resourceVersion": "456", + }, + }, + }, + } + + client := &fakeNamespaceableResource{ + listFn: func(context.Context, v1.ListOptions) (*unstructured.UnstructuredList, error) { + return &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "metadata": map[string]interface{}{ + "resourceVersion": "100", + }, + }, + }, nil + }, + watchFn: func(context.Context, v1.ListOptions) (watch.Interface, error) { + return resourceWatch, nil + }, + } + + resourceC := make(chan *ResourceObservation, 8) + ctx, cancel := context.WithCancel(context.Background()) + + go func() { + for { + select { + case obs := <-resourceC: + if obs.ObservationType == ObservationTypeAdd && string(obs.UID) == "uid-1234" { + cancel() + return + } + case <-ctx.Done(): + return + } + } + }() + + gvr := schema.GroupVersionResource{Resource: "pods"} + _ = listAndWatchResource(ctx, klog.NewKlogr(), client, gvr, map[types.UID]*resourceMeta{}, resourceC) + + if ctx.Err() == nil { + t.Fatalf("expected context to be cancelled after receiving Added observation") + } + if !resourceWatch.stopped { + t.Fatalf("expected watch.Stop() to be called") + } +} + +func TestListAndWatchResource_ClosedChannelReturnsError(t *testing.T) { + t.Parallel() + + resourceWatch := newTrackingWatch() + // Close the channel immediately to simulate a watch stream that closes + // without sending any events. + close(resourceWatch.resultC) + resourceWatch.stopped = true + + client := &fakeNamespaceableResource{ + listFn: func(context.Context, v1.ListOptions) (*unstructured.UnstructuredList, error) { + return &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "metadata": map[string]interface{}{ + "resourceVersion": "100", + }, + }, + }, nil + }, + watchFn: func(context.Context, v1.ListOptions) (watch.Interface, error) { + return resourceWatch, nil + }, + } + + err := listAndWatchResource(context.Background(), klog.NewKlogr(), client, schema.GroupVersionResource{Resource: "pods"}, map[types.UID]*resourceMeta{}, make(chan *ResourceObservation, 8)) + if !errors.Is(err, errWatchClosed) { + t.Fatalf("expected errWatchClosed, got: %v", err) + } +} + +func TestWaitForRetry_CancelledContext(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + start := time.Now() + ok := waitForRetry(ctx, 2*time.Second) + if ok { + t.Fatalf("expected waitForRetry to abort on cancelled context") + } + if elapsed := time.Since(start); elapsed > 250*time.Millisecond { + t.Fatalf("expected prompt cancellation, elapsed=%v", elapsed) + } +}