From ca12c8c38b5373461545ea62476524ffb0240dc6 Mon Sep 17 00:00:00 2001 From: Sylvain Boily <4981802+djsly@users.noreply.github.com> Date: Fri, 6 Feb 2026 20:11:24 -0500 Subject: [PATCH] trying new listWatch logic for WaitNodeReady --- e2e/kube.go | 127 +++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 96 insertions(+), 31 deletions(-) diff --git a/e2e/kube.go b/e2e/kube.go index f8efd38c28e..c8314faa35d 100644 --- a/e2e/kube.go +++ b/e2e/kube.go @@ -11,7 +11,6 @@ import ( "github.com/Azure/agentbaker/e2e/config" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" - "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" @@ -143,6 +142,20 @@ func (k *Kubeclient) WaitUntilPodRunning(ctx context.Context, namespace string, return k.WaitUntilPodRunningWithRetry(ctx, namespace, labelSelector, fieldSelector, 0) } +// isNodeReady checks if a node matches the vmssName prefix and is in Ready condition. +// Returns (nodeName, isReady) where nodeName is non-empty if the node matches. +func isNodeReady(node *corev1.Node, vmssName string) (string, bool) { + if !strings.HasPrefix(node.Name, vmssName) { + return "", false + } + for _, cond := range node.Status.Conditions { + if cond.Type == corev1.NodeReady && cond.Status == corev1.ConditionTrue { + return node.Name, true + } + } + return node.Name, false +} + func (k *Kubeclient) WaitUntilNodeReady(ctx context.Context, t testing.TB, vmssName string) string { startTime := time.Now() t.Logf("waiting for node %s to be ready in k8s API", vmssName) @@ -150,53 +163,105 @@ func (k *Kubeclient) WaitUntilNodeReady(ctx context.Context, t testing.TB, vmssN t.Logf("waited for node %s to be ready in k8s API for %s", vmssName, time.Since(startTime)) }() - var node *corev1.Node = nil - watcher, err := k.Typed.CoreV1().Nodes().Watch(ctx, metav1.ListOptions{}) - require.NoError(t, err, "failed to start watching nodes") - defer watcher.Stop() + var lastSeenNode *corev1.Node - for event := range watcher.ResultChan() { - if event.Type != watch.Added && event.Type != watch.Modified { - continue + for { + // Check if context is already cancelled + if ctx.Err() != nil { + break } - var nodeFromEvent *corev1.Node - switch v := event.Object.(type) { - case *corev1.Node: - nodeFromEvent = v - - default: - t.Logf("skipping object type %T", event.Object) + // Step 1: List existing nodes to get current state and resourceVersion + nodeList, err := k.Typed.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + if ctx.Err() != nil { + break + } + t.Logf("error listing nodes, will retry: %v", err) + time.Sleep(time.Second) continue } - if !strings.HasPrefix(nodeFromEvent.Name, vmssName) { + // Check if node is already ready in the list + for i := range nodeList.Items { + node := &nodeList.Items[i] + if nodeName, ready := isNodeReady(node, vmssName); nodeName != "" { + lastSeenNode = node + if ready { + nodeTaints, _ := json.Marshal(node.Spec.Taints) + nodeConditions, _ := json.Marshal(node.Status.Conditions) + t.Logf("node %s is ready. Taints: %s Conditions: %s", node.Name, string(nodeTaints), string(nodeConditions)) + return node.Name + } + } + } + + // Step 2: Watch for changes starting from the list's resourceVersion + watcher, err := k.Typed.CoreV1().Nodes().Watch(ctx, metav1.ListOptions{ + ResourceVersion: nodeList.ResourceVersion, + }) + if err != nil { + if ctx.Err() != nil { + break + } + t.Logf("error starting watch, will retry: %v", err) + time.Sleep(time.Second) continue } - // found the right node. Use it! - node = nodeFromEvent - nodeTaints, _ := json.Marshal(node.Spec.Taints) - nodeConditions, _ := json.Marshal(node.Status.Conditions) + // Process watch events + watchLoop: + for { + select { + case <-ctx.Done(): + watcher.Stop() + break watchLoop + case event, ok := <-watcher.ResultChan(): + if !ok { + // Watch channel closed, restart the ListWatch + t.Logf("watch closed, restarting ListWatch") + break watchLoop + } + + if event.Type != watch.Added && event.Type != watch.Modified { + continue + } + + node, ok := event.Object.(*corev1.Node) + if !ok { + t.Logf("skipping object type %T", event.Object) + continue + } + + nodeName, ready := isNodeReady(node, vmssName) + if nodeName == "" { + continue + } + + lastSeenNode = node + nodeTaints, _ := json.Marshal(node.Spec.Taints) + nodeConditions, _ := json.Marshal(node.Status.Conditions) - for _, cond := range node.Status.Conditions { - if cond.Type == corev1.NodeReady && cond.Status == corev1.ConditionTrue { - t.Logf("node %s is ready. Taints: %s Conditions: %s", node.Name, string(nodeTaints), string(nodeConditions)) - return node.Name + if ready { + t.Logf("node %s is ready. Taints: %s Conditions: %s", node.Name, string(nodeTaints), string(nodeConditions)) + watcher.Stop() + return node.Name + } + + t.Logf("node %s is not ready yet. Taints: %s Conditions: %s", node.Name, string(nodeTaints), string(nodeConditions)) } } - - t.Logf("node %s is not ready. Taints: %s Conditions: %s", node.Name, string(nodeTaints), string(nodeConditions)) } - if node == nil { - t.Fatalf("%q haven't appeared in k8s API server", vmssName) + // Context was cancelled or timed out + if lastSeenNode == nil { + t.Fatalf("%q haven't appeared in k8s API server: %v", vmssName, ctx.Err()) return "" } - nodeString, _ := json.Marshal(node) - t.Fatalf("failed to wait for %q (%s) to be ready %+v. Detail: %s", vmssName, node.Name, node.Status, string(nodeString)) - return node.Name + nodeString, _ := json.Marshal(lastSeenNode) + t.Fatalf("failed to wait for %q (%s) to be ready: %v. Detail: %s", vmssName, lastSeenNode.Name, ctx.Err(), string(nodeString)) + return "" } // GetPodNetworkDebugPodForNode returns a pod that's a member of the 'debugnonhost' daemonset running in the cluster - this will return