-
Notifications
You must be signed in to change notification settings - Fork 247
chore(e2e): trying new listWatch logic for WaitNodeReady #7826
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -11,7 +11,6 @@ import ( | |||||||||||||
|
|
||||||||||||||
| "github.com/Azure/agentbaker/e2e/config" | ||||||||||||||
| "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" | ||||||||||||||
| "github.com/stretchr/testify/require" | ||||||||||||||
| appsv1 "k8s.io/api/apps/v1" | ||||||||||||||
| corev1 "k8s.io/api/core/v1" | ||||||||||||||
| v1 "k8s.io/api/core/v1" | ||||||||||||||
|
|
@@ -143,60 +142,126 @@ func (k *Kubeclient) WaitUntilPodRunning(ctx context.Context, namespace string, | |||||||||||||
| return k.WaitUntilPodRunningWithRetry(ctx, namespace, labelSelector, fieldSelector, 0) | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // isNodeReady checks if a node matches the vmssName prefix and is in Ready condition. | ||||||||||||||
| // Returns (nodeName, isReady) where nodeName is non-empty if the node matches. | ||||||||||||||
| func isNodeReady(node *corev1.Node, vmssName string) (string, bool) { | ||||||||||||||
| if !strings.HasPrefix(node.Name, vmssName) { | ||||||||||||||
| return "", false | ||||||||||||||
| } | ||||||||||||||
| for _, cond := range node.Status.Conditions { | ||||||||||||||
| if cond.Type == corev1.NodeReady && cond.Status == corev1.ConditionTrue { | ||||||||||||||
| return node.Name, true | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| return node.Name, false | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| func (k *Kubeclient) WaitUntilNodeReady(ctx context.Context, t testing.TB, vmssName string) string { | ||||||||||||||
| startTime := time.Now() | ||||||||||||||
| t.Logf("waiting for node %s to be ready in k8s API", vmssName) | ||||||||||||||
| defer func() { | ||||||||||||||
| t.Logf("waited for node %s to be ready in k8s API for %s", vmssName, time.Since(startTime)) | ||||||||||||||
| }() | ||||||||||||||
|
|
||||||||||||||
| var node *corev1.Node = nil | ||||||||||||||
| watcher, err := k.Typed.CoreV1().Nodes().Watch(ctx, metav1.ListOptions{}) | ||||||||||||||
| require.NoError(t, err, "failed to start watching nodes") | ||||||||||||||
| defer watcher.Stop() | ||||||||||||||
| var lastSeenNode *corev1.Node | ||||||||||||||
|
|
||||||||||||||
| for event := range watcher.ResultChan() { | ||||||||||||||
| if event.Type != watch.Added && event.Type != watch.Modified { | ||||||||||||||
| continue | ||||||||||||||
| for { | ||||||||||||||
| // Check if context is already cancelled | ||||||||||||||
| if ctx.Err() != nil { | ||||||||||||||
| break | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| var nodeFromEvent *corev1.Node | ||||||||||||||
| switch v := event.Object.(type) { | ||||||||||||||
| case *corev1.Node: | ||||||||||||||
| nodeFromEvent = v | ||||||||||||||
|
|
||||||||||||||
| default: | ||||||||||||||
| t.Logf("skipping object type %T", event.Object) | ||||||||||||||
| // Step 1: List existing nodes to get current state and resourceVersion | ||||||||||||||
| nodeList, err := k.Typed.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think LIST+WATCH pattern been important for older versions. Where WATCH didn't produce initial events. There is |
||||||||||||||
| if err != nil { | ||||||||||||||
| if ctx.Err() != nil { | ||||||||||||||
| break | ||||||||||||||
| } | ||||||||||||||
| t.Logf("error listing nodes, will retry: %v", err) | ||||||||||||||
| time.Sleep(time.Second) | ||||||||||||||
| continue | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| if !strings.HasPrefix(nodeFromEvent.Name, vmssName) { | ||||||||||||||
| // Check if node is already ready in the list | ||||||||||||||
| for i := range nodeList.Items { | ||||||||||||||
| node := &nodeList.Items[i] | ||||||||||||||
| if nodeName, ready := isNodeReady(node, vmssName); nodeName != "" { | ||||||||||||||
| lastSeenNode = node | ||||||||||||||
| if ready { | ||||||||||||||
| nodeTaints, _ := json.Marshal(node.Spec.Taints) | ||||||||||||||
| nodeConditions, _ := json.Marshal(node.Status.Conditions) | ||||||||||||||
| t.Logf("node %s is ready. Taints: %s Conditions: %s", node.Name, string(nodeTaints), string(nodeConditions)) | ||||||||||||||
| return node.Name | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Step 2: Watch for changes starting from the list's resourceVersion | ||||||||||||||
| watcher, err := k.Typed.CoreV1().Nodes().Watch(ctx, metav1.ListOptions{ | ||||||||||||||
| ResourceVersion: nodeList.ResourceVersion, | ||||||||||||||
| }) | ||||||||||||||
| if err != nil { | ||||||||||||||
| if ctx.Err() != nil { | ||||||||||||||
| break | ||||||||||||||
| } | ||||||||||||||
| t.Logf("error starting watch, will retry: %v", err) | ||||||||||||||
| time.Sleep(time.Second) | ||||||||||||||
| continue | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // found the right node. Use it! | ||||||||||||||
| node = nodeFromEvent | ||||||||||||||
| nodeTaints, _ := json.Marshal(node.Spec.Taints) | ||||||||||||||
| nodeConditions, _ := json.Marshal(node.Status.Conditions) | ||||||||||||||
| // Process watch events | ||||||||||||||
| watchLoop: | ||||||||||||||
| for { | ||||||||||||||
| select { | ||||||||||||||
| case <-ctx.Done(): | ||||||||||||||
| watcher.Stop() | ||||||||||||||
| break watchLoop | ||||||||||||||
| case event, ok := <-watcher.ResultChan(): | ||||||||||||||
| if !ok { | ||||||||||||||
| // Watch channel closed, restart the ListWatch | ||||||||||||||
| t.Logf("watch closed, restarting ListWatch") | ||||||||||||||
|
Comment on lines
+221
to
+222
|
||||||||||||||
| // Watch channel closed, restart the ListWatch | |
| t.Logf("watch closed, restarting ListWatch") | |
| // Watch channel closed, restart the ListWatch after a short delay | |
| t.Logf("watch closed, restarting ListWatch after short delay") | |
| watcher.Stop() | |
| time.Sleep(500 * time.Millisecond) |
Copilot
AI
Feb 7, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When watcher.ResultChan() is closed (ok == false), the code breaks out to restart the ListWatch but never calls watcher.Stop(). Even if the channel is closed, calling Stop() is the safe/idiomatic way to release the watch and avoid lingering goroutines/resources before retrying.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The import block includes both
corev1andv1aliases for the same package path (k8s.io/api/core/v1), which will not compile in Go (duplicate import). Drop one alias and update the few call sites accordingly (e.g., usecorev1.Secret/corev1.SecretTypeDockerConfigJson/corev1.DockerConfigJsonKey).