Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions cmd/deployment-tracker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package main
import (
"context"
"flag"
"fmt"
"log/slog"
"os"
"os/signal"
"syscall"
Expand Down Expand Up @@ -38,38 +38,44 @@ func main() {
flag.IntVar(&workers, "workers", 2, "number of worker goroutines")
flag.Parse()

// init logging
opts := slog.HandlerOptions{Level: slog.LevelInfo}
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &opts)))

var cntrlCfg = controller.Config{
Template: getEnvOrDefault("DN_TEMPLATE", defaultTemplate),
LogicalEnvironment: os.Getenv("LOGICAL_ENVIRONMENT"),
PhysicalEnvironment: os.Getenv("PHYSICAL_ENVIRONMENT"),
Cluster: os.Getenv("CLUSTER"),
APIToken: getEnvOrDefault("API_TOKEN", ""),
BaseURL: getEnvOrDefault("BASE_URL", "api.github.com"),
Org: os.Getenv("ORG"),
Organization: os.Getenv("GITHUB_ORG"),
}

if cntrlCfg.LogicalEnvironment == "" {
fmt.Fprint(os.Stderr, "Logical environment is required\n")
slog.Error("Logical environment is required")
os.Exit(1)
}
if cntrlCfg.Cluster == "" {
fmt.Fprint(os.Stderr, "Cluster is required\n")
slog.Error("Cluster is required")
os.Exit(1)
}
if cntrlCfg.Org == "" {
fmt.Fprint(os.Stderr, "Org is required\n")
if cntrlCfg.Organization == "" {
slog.Error("Organiation is required")
os.Exit(1)
}

k8sCfg, err := createK8sConfig(kubeconfig)
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating Kubernetes config: %v\n", err)
slog.Error("Failed to create Kubernetes config",
"error", err)
os.Exit(1)
}

clientset, err := kubernetes.NewForConfig(k8sCfg)
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating Kubernetes client: %v\n", err)
slog.Error("Error creating Kubernetes client",
"error", err)
os.Exit(1)
}

Expand All @@ -79,15 +85,16 @@ func main() {
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
fmt.Println("\nShutting down...")
slog.Info("Shutting down...")
cancel()
}()

cntrl := controller.New(clientset, namespace, &cntrlCfg)

fmt.Println("Starting deployment-tracker controller")
slog.Info("Starting deployment-tracker controller")
if err := cntrl.Run(ctx, workers); err != nil {
fmt.Fprintf(os.Stderr, "Error running controller: %v\n", err)
slog.Error("Error running controller",
"error", err)
cancel()
os.Exit(1)
}
Expand Down
4 changes: 2 additions & 2 deletions deploy/manifest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ spec:
imagePullPolicy: IfNotPresent
env:
- name: DN_TEMPLATE
value: "{{namespace}}_{{deploymentName}}_{{containerName}}"
- name: ORG
value: "{{namespace}}/{{deploymentName}}/{{containerName}}"
- name: GITHUB_ORG
value: "test-org"
- name: LOGICAL_ENVIRONMENT
value: "staging"
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ type Config struct {
Cluster string
APIToken string
BaseURL string
Org string
Organization string
}
108 changes: 79 additions & 29 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package controller
import (
"context"
"errors"
"fmt"
"log/slog"
"strings"
"time"

Expand Down Expand Up @@ -40,7 +40,9 @@ func New(clientset kubernetes.Interface, namespace string, cfg *Config) *Control
// Create informer factory
var factory informers.SharedInformerFactory
if namespace == "" {
factory = informers.NewSharedInformerFactory(clientset, 30*time.Second)
factory = informers.NewSharedInformerFactory(clientset,
30*time.Second,
)
} else {
factory = informers.NewSharedInformerFactoryWithOptions(
clientset,
Expand All @@ -61,7 +63,11 @@ func New(clientset kubernetes.Interface, namespace string, cfg *Config) *Control
if cfg.APIToken != "" {
clientOpts = append(clientOpts, deploymentrecord.WithAPIToken(cfg.APIToken))
}
apiClient := deploymentrecord.NewClient(cfg.BaseURL, cfg.Org, clientOpts...)
apiClient := deploymentrecord.NewClient(
cfg.BaseURL,
cfg.Organization,
clientOpts...,
)

cntrl := &Controller{
clientset: clientset,
Expand All @@ -76,30 +82,41 @@ func New(clientset kubernetes.Interface, namespace string, cfg *Config) *Control
AddFunc: func(obj any) {
pod, ok := obj.(*corev1.Pod)
if !ok {
fmt.Printf("error: invalid object returned: %+v\n",
obj)
slog.Error("Invalid object returned",
"object", obj,
)
return
}

// Only process pods that are running
if pod.Status.Phase == corev1.PodRunning {
key, err := cache.MetaNamespaceKeyFunc(obj)

// For our purposes, there are in practice
// no error event we care about, so don't
// bother with handling it.
if err == nil {
queue.Add(PodEvent{Key: key, EventType: "CREATED", Pod: pod.DeepCopy()})
queue.Add(PodEvent{
Key: key,
EventType: "CREATED",
Pod: pod.DeepCopy(),
})
}
}
},
UpdateFunc: func(oldObj, newObj any) {
oldPod, ok := oldObj.(*corev1.Pod)
if !ok {
fmt.Printf("error: invalid object returned: %+v\n",
oldObj)
slog.Error("Invalid old object returned",
"object", oldObj,
)
return
}
newPod, ok := newObj.(*corev1.Pod)
if !ok {
fmt.Printf("error: invalid object returned: %+v\n",
newObj)
slog.Error("Invalid new object returned",
"object", newObj,
)
return
}

Expand All @@ -109,10 +126,19 @@ func New(clientset kubernetes.Interface, namespace string, cfg *Config) *Control
}

// Only process if pod just became running
if oldPod.Status.Phase != corev1.PodRunning && newPod.Status.Phase == corev1.PodRunning {
if oldPod.Status.Phase != corev1.PodRunning &&
newPod.Status.Phase == corev1.PodRunning {
key, err := cache.MetaNamespaceKeyFunc(newObj)

// For our purposes, there are in practice
// no error event we care about, so don't
// bother with handling it.
if err == nil {
queue.Add(PodEvent{Key: key, EventType: "CREATED", Pod: newPod.DeepCopy()})
queue.Add(PodEvent{
Key: key,
EventType: "CREATED",
Pod: newPod.DeepCopy(),
})
}
}
},
Expand All @@ -130,14 +156,23 @@ func New(clientset kubernetes.Interface, namespace string, cfg *Config) *Control
}
}
key, err := cache.MetaNamespaceKeyFunc(obj)

// For our purposes, there are in practice
// no error event we care about, so don't
// bother with handling it.
if err == nil {
queue.Add(PodEvent{Key: key, EventType: "DELETED", Pod: pod.DeepCopy()})
queue.Add(PodEvent{
Key: key,
EventType: "DELETED",
Pod: pod.DeepCopy(),
})
}
},
})

if err != nil {
fmt.Printf("ERROR: failed to add event handlers: %s\n", err)
slog.Error("Failed to add event handlers",
"error", err)
}

return cntrl
Expand All @@ -148,28 +183,30 @@ func (c *Controller) Run(ctx context.Context, workers int) error {
defer runtime.HandleCrash()
defer c.workqueue.ShutDown()

fmt.Println("Starting pod informer")
slog.Info("Starting pod informer")

// Start the informer
go c.podInformer.Run(ctx.Done())

// Wait for the cache to be synced
fmt.Println("Waiting for informer cache to sync")
slog.Info("Waiting for informer cache to sync")
if !cache.WaitForCacheSync(ctx.Done(), c.podInformer.HasSynced) {
return errors.New("timed out waiting for caches to sync")
}

fmt.Printf("Starting %d workers\n", workers)
slog.Info("Starting workers",
"count", workers,
)

// Start workers
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}

fmt.Println("Controller started")
slog.Info("Controller started")

<-ctx.Done()
fmt.Println("Shutting down workers")
slog.Info("Shutting down workers")

return nil
}
Expand All @@ -195,16 +232,17 @@ func (c *Controller) processNextItem(ctx context.Context) bool {
}

// Requeue on error with rate limiting
fmt.Printf("Error processing %s: %v, requeuing\n", event.Key, err)
slog.Error("Failed to process event, requeuing",
"event_key", event.Key,
"error", err,
)
c.workqueue.AddRateLimited(event)

return true
}

// processEvent processes a single pod event.
func (c *Controller) processEvent(ctx context.Context, event PodEvent) error {
timestamp := time.Now().Format(time.RFC3339)

pod := event.Pod
if pod == nil {
return nil
Expand All @@ -219,14 +257,14 @@ func (c *Controller) processEvent(ctx context.Context, event PodEvent) error {

// Record info for each container in the pod
for _, container := range pod.Spec.Containers {
if err := c.recordContainer(ctx, pod, container, status, event.EventType, timestamp); err != nil {
if err := c.recordContainer(ctx, pod, container, status, event.EventType); err != nil {
lastErr = err
}
}

// Also record init containers
for _, container := range pod.Spec.InitContainers {
if err := c.recordContainer(ctx, pod, container, status, event.EventType, timestamp); err != nil {
if err := c.recordContainer(ctx, pod, container, status, event.EventType); err != nil {
lastErr = err
}
}
Expand All @@ -235,7 +273,7 @@ func (c *Controller) processEvent(ctx context.Context, event PodEvent) error {
}

// recordContainer records a single container's deployment info.
func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, container corev1.Container, status, eventType, timestamp string) error {
func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, container corev1.Container, status, eventType string) error {
dn := getARDeploymentName(pod, container, c.cfg.Template)
digest := getContainerDigest(pod, container.Name)

Expand All @@ -259,13 +297,25 @@ func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, conta
)

if err := c.apiClient.PostOne(ctx, record); err != nil {
fmt.Printf("[%s] FAILED %s name=%s deployment_name=%s error=%v\n",
timestamp, eventType, record.Name, record.DeploymentName, err)
slog.Error("Failed to post record",
"event_type", eventType,
"name", record.Name,
"deployment_name", record.DeploymentName,
"status", record.Status,
"digest", record.Digest,
"error", err,
)
return err
}

fmt.Printf("[%s] OK %s name=%s deployment_name=%s digest=%s status=%s\n",
timestamp, eventType, record.Name, record.DeploymentName, record.Digest, record.Status)
slog.Info("Posted record",
"event_type", eventType,
"name", record.Name,
"deployment_name", record.DeploymentName,
"status", record.Status,
"digest", record.Digest,
)

return nil
}

Expand Down