Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 96 additions & 31 deletions e2e/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/Azure/agentbaker/e2e/config"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
Copy link

Copilot AI Feb 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The import block includes both corev1 and v1 aliases for the same package path (k8s.io/api/core/v1), which will not compile in Go (duplicate import). Drop one alias and update the few call sites accordingly (e.g., use corev1.Secret / corev1.SecretTypeDockerConfigJson / corev1.DockerConfigJsonKey).

Suggested change
v1 "k8s.io/api/core/v1"

Copilot uses AI. Check for mistakes.
Expand Down Expand Up @@ -143,60 +142,126 @@ func (k *Kubeclient) WaitUntilPodRunning(ctx context.Context, namespace string,
return k.WaitUntilPodRunningWithRetry(ctx, namespace, labelSelector, fieldSelector, 0)
}

// isNodeReady checks if a node matches the vmssName prefix and is in Ready condition.
// Returns (nodeName, isReady) where nodeName is non-empty if the node matches.
func isNodeReady(node *corev1.Node, vmssName string) (string, bool) {
if !strings.HasPrefix(node.Name, vmssName) {
return "", false
}
for _, cond := range node.Status.Conditions {
if cond.Type == corev1.NodeReady && cond.Status == corev1.ConditionTrue {
return node.Name, true
}
}
return node.Name, false
}

func (k *Kubeclient) WaitUntilNodeReady(ctx context.Context, t testing.TB, vmssName string) string {
startTime := time.Now()
t.Logf("waiting for node %s to be ready in k8s API", vmssName)
defer func() {
t.Logf("waited for node %s to be ready in k8s API for %s", vmssName, time.Since(startTime))
}()

var node *corev1.Node = nil
watcher, err := k.Typed.CoreV1().Nodes().Watch(ctx, metav1.ListOptions{})
require.NoError(t, err, "failed to start watching nodes")
defer watcher.Stop()
var lastSeenNode *corev1.Node

for event := range watcher.ResultChan() {
if event.Type != watch.Added && event.Type != watch.Modified {
continue
for {
// Check if context is already cancelled
if ctx.Err() != nil {
break
}

var nodeFromEvent *corev1.Node
switch v := event.Object.(type) {
case *corev1.Node:
nodeFromEvent = v

default:
t.Logf("skipping object type %T", event.Object)
// Step 1: List existing nodes to get current state and resourceVersion
nodeList, err := k.Typed.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think LIST+WATCH pattern been important for older versions. Where WATCH didn't produce initial events.

There is SendInitialEvents that seems to be set to True by default.
I don't think you need LIST.

if err != nil {
if ctx.Err() != nil {
break
}
t.Logf("error listing nodes, will retry: %v", err)
time.Sleep(time.Second)
continue
}

if !strings.HasPrefix(nodeFromEvent.Name, vmssName) {
// Check if node is already ready in the list
for i := range nodeList.Items {
node := &nodeList.Items[i]
if nodeName, ready := isNodeReady(node, vmssName); nodeName != "" {
lastSeenNode = node
if ready {
nodeTaints, _ := json.Marshal(node.Spec.Taints)
nodeConditions, _ := json.Marshal(node.Status.Conditions)
t.Logf("node %s is ready. Taints: %s Conditions: %s", node.Name, string(nodeTaints), string(nodeConditions))
return node.Name
}
}
}

// Step 2: Watch for changes starting from the list's resourceVersion
watcher, err := k.Typed.CoreV1().Nodes().Watch(ctx, metav1.ListOptions{
ResourceVersion: nodeList.ResourceVersion,
})
if err != nil {
if ctx.Err() != nil {
break
}
t.Logf("error starting watch, will retry: %v", err)
time.Sleep(time.Second)
continue
}

// found the right node. Use it!
node = nodeFromEvent
nodeTaints, _ := json.Marshal(node.Spec.Taints)
nodeConditions, _ := json.Marshal(node.Status.Conditions)
// Process watch events
watchLoop:
for {
select {
case <-ctx.Done():
watcher.Stop()
break watchLoop
case event, ok := <-watcher.ResultChan():
if !ok {
// Watch channel closed, restart the ListWatch
t.Logf("watch closed, restarting ListWatch")
Comment on lines +221 to +222
Copy link

Copilot AI Feb 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the watch is closed immediately/repeatedly (e.g., apiserver timeouts, transient network issues), the outer loop will re-List and re-Watch in a tight loop with no delay on the ok == false path, which can hammer the apiserver. Add a small sleep/backoff when restarting after a closed watch (and consider exponential backoff / RetryWatcher).

Suggested change
// Watch channel closed, restart the ListWatch
t.Logf("watch closed, restarting ListWatch")
// Watch channel closed, restart the ListWatch after a short delay
t.Logf("watch closed, restarting ListWatch after short delay")
watcher.Stop()
time.Sleep(500 * time.Millisecond)

Copilot uses AI. Check for mistakes.
break watchLoop
}
Comment on lines +219 to +224
Copy link

Copilot AI Feb 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When watcher.ResultChan() is closed (ok == false), the code breaks out to restart the ListWatch but never calls watcher.Stop(). Even if the channel is closed, calling Stop() is the safe/idiomatic way to release the watch and avoid lingering goroutines/resources before retrying.

Copilot uses AI. Check for mistakes.

if event.Type != watch.Added && event.Type != watch.Modified {
continue
}

node, ok := event.Object.(*corev1.Node)
if !ok {
t.Logf("skipping object type %T", event.Object)
continue
}

nodeName, ready := isNodeReady(node, vmssName)
if nodeName == "" {
continue
}

lastSeenNode = node
nodeTaints, _ := json.Marshal(node.Spec.Taints)
nodeConditions, _ := json.Marshal(node.Status.Conditions)

for _, cond := range node.Status.Conditions {
if cond.Type == corev1.NodeReady && cond.Status == corev1.ConditionTrue {
t.Logf("node %s is ready. Taints: %s Conditions: %s", node.Name, string(nodeTaints), string(nodeConditions))
return node.Name
if ready {
t.Logf("node %s is ready. Taints: %s Conditions: %s", node.Name, string(nodeTaints), string(nodeConditions))
watcher.Stop()
return node.Name
}

t.Logf("node %s is not ready yet. Taints: %s Conditions: %s", node.Name, string(nodeTaints), string(nodeConditions))
}
}

t.Logf("node %s is not ready. Taints: %s Conditions: %s", node.Name, string(nodeTaints), string(nodeConditions))
}

if node == nil {
t.Fatalf("%q haven't appeared in k8s API server", vmssName)
// Context was cancelled or timed out
if lastSeenNode == nil {
t.Fatalf("%q haven't appeared in k8s API server: %v", vmssName, ctx.Err())
return ""
}

nodeString, _ := json.Marshal(node)
t.Fatalf("failed to wait for %q (%s) to be ready %+v. Detail: %s", vmssName, node.Name, node.Status, string(nodeString))
return node.Name
nodeString, _ := json.Marshal(lastSeenNode)
t.Fatalf("failed to wait for %q (%s) to be ready: %v. Detail: %s", vmssName, lastSeenNode.Name, ctx.Err(), string(nodeString))
return ""
}

// GetPodNetworkDebugPodForNode returns a pod that's a member of the 'debugnonhost' daemonset running in the cluster - this will return
Expand Down
Loading