diff --git a/.gitignore b/.gitignore index d8b5cd02..df1d3447 100644 --- a/.gitignore +++ b/.gitignore @@ -32,3 +32,6 @@ tmp/ # VSCode settings .vscode/ + +# Claude Code +.claude/ diff --git a/contrib/cmd/runkperf/commands/data/client.go b/contrib/cmd/runkperf/commands/data/client.go index 47e6df2b..3c894fe1 100644 --- a/contrib/cmd/runkperf/commands/data/client.go +++ b/contrib/cmd/runkperf/commands/data/client.go @@ -4,11 +4,21 @@ package data import ( + "context" + "fmt" + "math/rand" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/flowcontrol" ) +// default label value used to identify resources created by runkperf. +const AppLabel = "runkperf" + // NewClientset creates a Kubernetes clientset with default rate limiting. func NewClientset(kubeCfgPath string) (*kubernetes.Clientset, error) { config, err := clientcmd.BuildConfigFromFlags("", kubeCfgPath) @@ -29,3 +39,52 @@ func NewClientsetWithRateLimiter(kubeCfgPath string, qps float32, burst int) (*k config.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst) return kubernetes.NewForConfig(config) } + +// PrepareNamespace creates the namespace if it does not already exist. +// It skips creation for the "default" namespace. +func PrepareNamespace(clientset *kubernetes.Clientset, namespace string) error { + if namespace == "" { + return fmt.Errorf("namespace cannot be empty") + } + + if namespace == "default" { + return nil + } + + _, err := clientset.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace, + }, + }, metav1.CreateOptions{}) + if err != nil { + if errors.IsAlreadyExists(err) { + return nil + } + return fmt.Errorf("failed to create namespace %s: %w", namespace, err) + } + return nil +} + +var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789") + +// RandBytes generates a random byte slice of length n using alphanumeric characters. +func RandBytes(n int) ([]byte, error) { + if n <= 0 { + return nil, fmt.Errorf("length must be positive") + } + + b := make([]byte, n) + for i := range b { + b[i] = byte(letterRunes[rand.Intn(len(letterRunes))]) //nolint:gosec // G404: intentionally using math/rand for benchmark data, cryptographic randomness not needed + } + return b, nil +} + +// RandString generates a random string of length n using alphanumeric characters. +func RandString(n int) (string, error) { + b, err := RandBytes(n) + if err != nil { + return "", err + } + return string(b), nil +} diff --git a/contrib/cmd/runkperf/commands/data/configmaps/configmap.go b/contrib/cmd/runkperf/commands/data/configmaps/configmap.go index 6ce936a7..4f18fdb7 100644 --- a/contrib/cmd/runkperf/commands/data/configmaps/configmap.go +++ b/contrib/cmd/runkperf/commands/data/configmaps/configmap.go @@ -5,9 +5,7 @@ package configmaps import ( "context" - "crypto/rand" "fmt" - "math/big" "os" "strconv" "strings" @@ -26,8 +24,6 @@ import ( "k8s.io/client-go/kubernetes" ) -var appLebel = "runkperf" - var Command = cli.Command{ Name: "configmap", ShortName: "cm", @@ -40,7 +36,7 @@ var Command = cli.Command{ }, cli.StringFlag{ Name: "namespace", - Usage: "Namespace to use with commands. If the namespace does not exist, it will be created.", + Usage: "Namespace to use with commands. If the namespace does not exist, it will be created when executing add subcommand", Value: "default", }, }, @@ -111,7 +107,7 @@ var configmapAddCommand = cli.Command{ return err } - err = prepareNamespace(clientset, namespace) + err = data.PrepareNamespace(clientset, namespace) if err != nil { return err } @@ -141,7 +137,7 @@ var configmapDelCommand = cli.Command{ namespace := cliCtx.GlobalString("namespace") kubeCfgPath := cliCtx.GlobalString("kubeconfig") - labelSelector := fmt.Sprintf("app=%s,cmName=%s", appLebel, cmName) + labelSelector := fmt.Sprintf("app=%s,cmName=%s", data.AppLabel, cmName) clientset, err := data.NewClientset(kubeCfgPath) if err != nil { @@ -187,12 +183,12 @@ var configmapListCommand = cli.Command{ // If args are provided, list all configmaps with the label app=runkperf and cmName in (args) var labelSelector string if cliCtx.NArg() == 0 { - labelSelector = fmt.Sprintf("app=%s", appLebel) + labelSelector = fmt.Sprintf("app=%s", data.AppLabel) } else { args := cliCtx.Args() namesStr := strings.Join(args, ",") - labelSelector = fmt.Sprintf("app=%s, cmName in (%s)", appLebel, namesStr) + labelSelector = fmt.Sprintf("app=%s, cmName in (%s)", data.AppLabel, namesStr) } cmMap := make(map[string][]int) err = listConfigmapsByName(clientset, labelSelector, namespace, cmMap) @@ -213,30 +209,6 @@ var configmapListCommand = cli.Command{ }, } -func prepareNamespace(clientset *kubernetes.Clientset, namespace string) error { - if namespace == "" { - return fmt.Errorf("namespace cannot be empty") - } - - if namespace == "default" { - return nil - } - - _, err := clientset.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: namespace, - }, - }, metav1.CreateOptions{}) - if err != nil { - // If the namespace already exists, ignore the error - if errors.IsAlreadyExists(err) { - return nil - } - return fmt.Errorf("failed to create namespace %s: %v", namespace, err) - } - return nil -} - func checkConfigmapParams(size int, groupSize int, total int) error { if size <= 0 { return fmt.Errorf("size must be greater than 0") @@ -253,24 +225,6 @@ func checkConfigmapParams(size int, groupSize int, total int) error { return nil } -var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") - -func randString(n int) (string, error) { - if n <= 0 { - return "", fmt.Errorf("length must be positive") - } - - b := make([]rune, n) - for i := range b { - random, err := rand.Int(rand.Reader, big.NewInt(int64(len(letterRunes)))) - if err != nil { - return "", fmt.Errorf("error generating random number: %w", err) - } - b[i] = letterRunes[int(random.Int64())] - } - return string(b), nil -} - func createConfigmaps(clientset *kubernetes.Clientset, namespace string, cmName string, size int, groupSize int, total int) error { // Generate configmaps in parallel with fixed group size // and random data @@ -281,22 +235,22 @@ func createConfigmaps(clientset *kubernetes.Clientset, namespace string, cmName g.Go(func() error { cli := clientset.CoreV1().ConfigMaps(namespace) - name := fmt.Sprintf("%s-cm-%s-%d", appLebel, cmName, j) + name := fmt.Sprintf("%s-cm-%s-%d", data.AppLabel, cmName, j) cm := &corev1.ConfigMap{} cm.Name = name // Set the labels for the configmap to easily identify in delete or list commands cm.Labels = map[string]string{ "ownerID": strconv.Itoa(ownerID), - "app": appLebel, + "app": data.AppLabel, "cmName": cmName, } - data, err := randString(size) + randData, err := data.RandString(size) if err != nil { return fmt.Errorf("failed to generate random string for configmap %s: %v", name, err) } cm.Data = map[string]string{ - "data": data, + "data": randData, } _, err = cli.Create(context.TODO(), cm, metav1.CreateOptions{}) diff --git a/contrib/cmd/runkperf/commands/data/daemonsets/daemonset.go b/contrib/cmd/runkperf/commands/data/daemonsets/daemonset.go index 9c7b79f1..79d3b060 100644 --- a/contrib/cmd/runkperf/commands/data/daemonsets/daemonset.go +++ b/contrib/cmd/runkperf/commands/data/daemonsets/daemonset.go @@ -11,8 +11,7 @@ import ( "text/tabwriter" "github.com/Azure/kperf/cmd/kperf/commands/utils" - "k8s.io/client-go/tools/clientcmd" - "k8s.io/client-go/util/flowcontrol" + "github.com/Azure/kperf/contrib/cmd/runkperf/commands/data" "github.com/urfave/cli" @@ -76,12 +75,12 @@ var daemonsetAddCommand = cli.Command{ return fmt.Errorf("count must be greater than 0") } - err := prepareNamespace(kubeCfgPath, namespace) + clientset, err := data.NewClientsetWithRateLimiter(kubeCfgPath, 30, 10) if err != nil { return err } - clientset, err := newClientsetWithRateLimiter(kubeCfgPath, 30, 10) + err = data.PrepareNamespace(clientset, namespace) if err != nil { return err } @@ -113,7 +112,7 @@ var daemonsetDelCommand = cli.Command{ namespace := cliCtx.GlobalString("namespace") kubeCfgPath := cliCtx.GlobalString("kubeconfig") - clientset, err := newClientsetWithRateLimiter(kubeCfgPath, 30, 10) + clientset, err := data.NewClientsetWithRateLimiter(kubeCfgPath, 30, 10) if err != nil { return err } @@ -139,7 +138,7 @@ var daemonsetListCommand = cli.Command{ Action: func(cliCtx *cli.Context) error { namespace := cliCtx.GlobalString("namespace") kubeCfgPath := cliCtx.GlobalString("kubeconfig") - clientset, err := newClientsetWithRateLimiter(kubeCfgPath, 30, 10) + clientset, err := data.NewClientsetWithRateLimiter(kubeCfgPath, 30, 10) if err != nil { return err } @@ -192,50 +191,6 @@ var daemonsetListCommand = cli.Command{ }, } -func prepareNamespace(kubeCfgPath string, namespace string) error { - if namespace == "" { - return fmt.Errorf("namespace cannot be empty") - } - - if namespace == "default" { - return nil - } - - clientset, err := newClientsetWithRateLimiter(kubeCfgPath, 30, 10) - if err != nil { - return err - } - - _, err = clientset.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: namespace, - }, - }, metav1.CreateOptions{}) - if err != nil { - // If the namespace already exists, ignore the error - if errors.IsAlreadyExists(err) { - return nil - } - return fmt.Errorf("failed to create namespace %s: %v", namespace, err) - } - return nil -} - -func newClientsetWithRateLimiter(kubeCfgPath string, qps float32, burst int) (*kubernetes.Clientset, error) { - config, err := clientcmd.BuildConfigFromFlags("", kubeCfgPath) - if err != nil { - return nil, err - } - - config.QPS = qps - config.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst) - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - return nil, err - } - return clientset, nil -} - func createDaemonsets(clientset *kubernetes.Clientset, namespace string, dsName string, count int) error { for i := 0; i < count; i++ { ds := &appsv1.DaemonSet{ diff --git a/contrib/cmd/runkperf/commands/data/events/event.go b/contrib/cmd/runkperf/commands/data/events/event.go new file mode 100644 index 00000000..dee42461 --- /dev/null +++ b/contrib/cmd/runkperf/commands/data/events/event.go @@ -0,0 +1,410 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package events + +import ( + "context" + "fmt" + "os" + "strings" + "text/tabwriter" + "time" + + "golang.org/x/sync/errgroup" + + "github.com/Azure/kperf/cmd/kperf/commands/utils" + "github.com/Azure/kperf/contrib/cmd/runkperf/commands/data" + + "github.com/urfave/cli" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" +) + +// defaultBatchSize is the number of events to list or delete per batch. +// It is used as the page size for paginated API calls. +const defaultBatchSize int64 = 300 + +var Command = cli.Command{ + Name: "event", + ShortName: "ev", + Usage: "Manage events", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "kubeconfig", + Usage: "Path to the kubeconfig file", + Value: utils.DefaultKubeConfigPath, + }, + cli.StringFlag{ + Name: "namespace", + Usage: "Namespace to use with commands. If the namespace does not exist, it will be created when executing add subcommand.", + Value: "default", + }, + }, + Subcommands: []cli.Command{ + eventAddCommand, + eventDelCommand, + eventListCommand, + }, +} + +var eventAddCommand = cli.Command{ + Name: "add", + Usage: "Add events set", + ArgsUsage: "NAME of the events set", + Flags: []cli.Flag{ + cli.IntFlag{ + Name: "total", + Usage: "Total number of events to create", + Value: 10, + }, + cli.IntFlag{ + Name: "group-size", + Usage: "Number of events to create in parallel per batch", + Value: 30, + }, + cli.StringFlag{ + Name: "reason", + Usage: "The reason string for the events (e.g. ScaleUp, FailedScheduling)", + Value: "BenchmarkEvent", + }, + cli.StringFlag{ + Name: "type", + Usage: "The type of the events: Normal or Warning", + Value: "Normal", + }, + cli.IntFlag{ + Name: "message-size", + Usage: "The size of the event message in bytes (default targets ~5KB total event size; k8s metadata overhead is ~500-700 bytes)", + Value: 5000, + }, + cli.StringFlag{ + Name: "involved-object-kind", + Usage: "The kind of the involved object (e.g. Pod, Node, Deployment)", + Value: "Pod", + }, + cli.StringFlag{ + Name: "involved-object-name", + Usage: "The name of the involved object. If empty, a name will be generated.", + Value: "", + }, + cli.Float64Flag{ + Name: "qps", + Usage: "QPS for the Kubernetes client rate limiter to control event operations", + Value: 100, + }, + cli.IntFlag{ + Name: "burst", + Usage: "Burst for the Kubernetes client rate limiter to control event operations", + Value: 200, + }, + }, + Action: func(cliCtx *cli.Context) error { + if cliCtx.NArg() != 1 { + return fmt.Errorf("required only one argument as events set name: %v", cliCtx.Args()) + } + eventSetName := strings.TrimSpace(cliCtx.Args().Get(0)) + if len(eventSetName) == 0 { + return fmt.Errorf("required non-empty event set name") + } + + kubeCfgPath := cliCtx.GlobalString("kubeconfig") + namespace := cliCtx.GlobalString("namespace") + total := cliCtx.Int("total") + groupSize := cliCtx.Int("group-size") + reason := cliCtx.String("reason") + eventType := cliCtx.String("type") + messageSize := cliCtx.Int("message-size") + involvedObjectKind := cliCtx.String("involved-object-kind") + involvedObjectName := cliCtx.String("involved-object-name") + qps := float32(cliCtx.Float64("qps")) + burst := cliCtx.Int("burst") + + if total <= 0 { + return fmt.Errorf("total must be greater than 0") + } + if groupSize <= 0 { + return fmt.Errorf("group-size must be greater than 0") + } + if groupSize > total { + return fmt.Errorf("group-size must be less than or equal to total") + } + if eventType != "Normal" && eventType != "Warning" { + return fmt.Errorf("type must be either Normal or Warning") + } + if messageSize <= 0 { + return fmt.Errorf("message-size must be greater than 0") + } + + clientset, err := data.NewClientsetWithRateLimiter(kubeCfgPath, qps, burst) + if err != nil { + return err + } + + err = data.PrepareNamespace(clientset, namespace) + if err != nil { + return err + } + + err = createEvents(clientset, namespace, eventSetName, total, groupSize, reason, eventType, messageSize, involvedObjectKind, involvedObjectName) + if err != nil { + return err + } + fmt.Printf("Created %d events (set=%s, reason=%s, type=%s) in namespace %s\n", + total, eventSetName, reason, eventType, namespace) + return nil + }, +} + +var eventDelCommand = cli.Command{ + Name: "delete", + ShortName: "del", + ArgsUsage: "NAME", + Usage: "Delete an events set", + Flags: []cli.Flag{ + cli.Float64Flag{ + Name: "qps", + Usage: "QPS for the Kubernetes client rate limiter to control event deletion", + Value: 100, + }, + cli.IntFlag{ + Name: "burst", + Usage: "Burst for the Kubernetes client rate limiter to control event deletion", + Value: 200, + }, + cli.IntFlag{ + Name: "group-size", + Usage: "Number of events to delete in parallel per batch", + Value: 30, + }, + }, + Action: func(cliCtx *cli.Context) error { + if cliCtx.NArg() != 1 { + return fmt.Errorf("required only one events set name") + } + eventSetName := strings.TrimSpace(cliCtx.Args().Get(0)) + if len(eventSetName) == 0 { + return fmt.Errorf("required non-empty events set name") + } + + namespace := cliCtx.GlobalString("namespace") + kubeCfgPath := cliCtx.GlobalString("kubeconfig") + qps := float32(cliCtx.Float64("qps")) + burst := cliCtx.Int("burst") + groupSize := cliCtx.Int("group-size") + if groupSize <= 0 { + return fmt.Errorf("group-size must be greater than 0") + } + labelSelector := fmt.Sprintf("app=%s,eventSetName=%s", data.AppLabel, eventSetName) + + clientset, err := data.NewClientsetWithRateLimiter(kubeCfgPath, qps, burst) + if err != nil { + return err + } + + err = deleteEvents(clientset, labelSelector, namespace, groupSize) + if err != nil { + return err + } + + fmt.Printf("Deleted events set %s in %s namespace\n", eventSetName, namespace) + return nil + }, +} + +var eventListCommand = cli.Command{ + Name: "list", + Usage: "List generated event sets. Lists all if no arguments are given; otherwise, provide event set names separated by spaces.", + ArgsUsage: "[NAME ...]", + Action: func(cliCtx *cli.Context) error { + namespace := cliCtx.GlobalString("namespace") + kubeCfgPath := cliCtx.GlobalString("kubeconfig") + clientset, err := data.NewClientset(kubeCfgPath) + if err != nil { + return err + } + + const ( + minWidth = 1 + tabWidth = 12 + padding = 3 + padChar = ' ' + flags = 0 + ) + tw := tabwriter.NewWriter(os.Stdout, minWidth, tabWidth, padding, padChar, flags) + fmt.Fprintln(tw, "NAME\tTYPE\tREASON\tTOTAL\t") + + var labelSelector string + if cliCtx.NArg() == 0 { + labelSelector = fmt.Sprintf("app=%s", data.AppLabel) + } else { + args := cliCtx.Args() + namesStr := strings.Join(args, ",") + labelSelector = fmt.Sprintf("app=%s, eventSetName in (%s)", data.AppLabel, namesStr) + } + + evMap := make(map[string]*eventSetInfo) + err = listEvents(clientset, labelSelector, namespace, defaultBatchSize, func(ev corev1.Event) error { + name, ok := ev.Labels["eventSetName"] + if !ok { + return nil + } + + info, ok := evMap[name] + if !ok { + info = &eventSetInfo{ + eventType: ev.Type, + reason: ev.Reason, + } + evMap[name] = info + } + info.total++ + return nil + }) + if err != nil { + return err + } + + for name, info := range evMap { + fmt.Fprintf(tw, "%s\t%s\t%s\t%d\n", + name, + info.eventType, + info.reason, + info.total, + ) + } + return tw.Flush() + }, +} + +type eventSetInfo struct { + eventType string + reason string + total int +} + +func createEvents(clientset *kubernetes.Clientset, namespace, eventSetName string, total, groupSize int, reason, eventType string, messageSize int, involvedObjectKind, involvedObjectName string) error { + now := time.Now() + + for i := 0; i < total; i += groupSize { + g := new(errgroup.Group) + for j := i; j < i+groupSize && j < total; j++ { + idx := j + g.Go(func() error { + name := fmt.Sprintf("%s-ev-%s-%d", data.AppLabel, eventSetName, idx) + + message, err := data.RandString(messageSize) + if err != nil { + return fmt.Errorf("failed to generate random message for event %s: %v", name, err) + } + + objName := involvedObjectName + if objName == "" { + objName = fmt.Sprintf("%s-obj-%d", eventSetName, idx) + } + + event := &corev1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: map[string]string{ + "app": data.AppLabel, + "eventSetName": eventSetName, + }, + }, + InvolvedObject: corev1.ObjectReference{ + Kind: involvedObjectKind, + Name: objName, + Namespace: namespace, + }, + Reason: reason, + Message: message, + Type: eventType, + Count: 1, + FirstTimestamp: metav1.NewTime(now), + LastTimestamp: metav1.NewTime(now), + Source: corev1.EventSource{ + Component: "runkperf-bench", + }, + } + + _, err = clientset.CoreV1().Events(namespace).Create(context.TODO(), event, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create event %s: %v", name, err) + } + return nil + }) + } + if err := g.Wait(); err != nil { + return err + } + } + return nil +} + +func deleteEvents(clientset *kubernetes.Clientset, labelSelector, namespace string, groupSize int) error { + var names []string + err := listEvents(clientset, labelSelector, namespace, defaultBatchSize, func(ev corev1.Event) error { + names = append(names, ev.Name) + return nil + }) + if err != nil { + return err + } + + if len(names) == 0 { + fmt.Printf("No events found in namespace %s, nothing to delete\n", namespace) + return nil + } + + n := len(names) + for i := 0; i < n; i += groupSize { + g := new(errgroup.Group) + for j := i; j < i+groupSize && j < n; j++ { + idx := j + g.Go(func() error { + err := clientset.CoreV1().Events(namespace).Delete(context.TODO(), names[idx], metav1.DeleteOptions{}) + if err != nil && !errors.IsNotFound(err) { + return fmt.Errorf("failed to delete event %s: %v", names[idx], err) + } + return nil + }) + } + if err := g.Wait(); err != nil { + return err + } + } + return nil +} + +func listEvents(clientset *kubernetes.Clientset, labelSelector, namespace string, limit int64, fn func(corev1.Event) error) error { + if limit <= 0 { + limit = defaultBatchSize + } + var continueToken string + for { + eventList, err := clientset.CoreV1().Events(namespace).List(context.TODO(), metav1.ListOptions{ + LabelSelector: labelSelector, + Limit: limit, + Continue: continueToken, + }) + if err != nil { + return fmt.Errorf("failed to list events: %v", err) + } + + for _, ev := range eventList.Items { + if err := fn(ev); err != nil { + return err + } + } + + continueToken = eventList.Continue + if continueToken == "" { + break + } + } + return nil +} diff --git a/contrib/cmd/runkperf/commands/data/secrets/secret.go b/contrib/cmd/runkperf/commands/data/secrets/secret.go new file mode 100644 index 00000000..cd89b845 --- /dev/null +++ b/contrib/cmd/runkperf/commands/data/secrets/secret.go @@ -0,0 +1,390 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package secrets + +import ( + "context" + "fmt" + "os" + "strconv" + "strings" + "text/tabwriter" + + "golang.org/x/sync/errgroup" + + "github.com/Azure/kperf/cmd/kperf/commands/utils" + "github.com/Azure/kperf/contrib/cmd/runkperf/commands/data" + + "github.com/urfave/cli" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" +) + +// defaultBatchSize is the number of secrets to list or delete per batch. +// It is used as the page size for paginated API calls. +const defaultBatchSize int64 = 300 + +var Command = cli.Command{ + Name: "secret", + ShortName: "sec", + Usage: "Manage secrets", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "kubeconfig", + Usage: "Path to the kubeconfig file", + Value: utils.DefaultKubeConfigPath, + }, + cli.StringFlag{ + Name: "namespace", + Usage: "Namespace to use with commands. If the namespace does not exist, it will be created when executing add subcommand", + Value: "default", + }, + }, + Subcommands: []cli.Command{ + secretAddCommand, + secretDelCommand, + secretListCommand, + }, +} + +var secretAddCommand = cli.Command{ + Name: "add", + Usage: "Add secrets set", + ArgsUsage: "NAME of the secrets set", + Flags: []cli.Flag{ + cli.IntFlag{ + Name: "size", + Usage: "The size of each secret value (Unit: Byte)", + Value: 100, + }, + cli.IntFlag{ + Name: "group-size", + Usage: "The number of secrets to create in parallel per batch", + Value: 30, + }, + cli.IntFlag{ + Name: "total", + Usage: "Total number of secrets to create", + Value: 10, + }, + cli.Float64Flag{ + Name: "qps", + Usage: "QPS for the Kubernetes client rate limiter to control secret operations", + Value: 100, + }, + cli.IntFlag{ + Name: "burst", + Usage: "Burst for the Kubernetes client rate limiter to control secret operations", + Value: 200, + }, + }, + Action: func(cliCtx *cli.Context) error { + if cliCtx.NArg() != 1 { + return fmt.Errorf("required only one argument as secrets set name: %v", cliCtx.Args()) + } + secName := strings.TrimSpace(cliCtx.Args().Get(0)) + if len(secName) == 0 { + return fmt.Errorf("required non-empty secret set name") + } + + kubeCfgPath := cliCtx.GlobalString("kubeconfig") + size := cliCtx.Int("size") + groupSize := cliCtx.Int("group-size") + total := cliCtx.Int("total") + + err := checkSecretParams(size, groupSize, total) + if err != nil { + return err + } + + namespace := cliCtx.GlobalString("namespace") + qps := float32(cliCtx.Float64("qps")) + burst := cliCtx.Int("burst") + + clientset, err := data.NewClientsetWithRateLimiter(kubeCfgPath, qps, burst) + if err != nil { + return err + } + + err = data.PrepareNamespace(clientset, namespace) + if err != nil { + return err + } + + err = createSecrets(clientset, namespace, secName, size, groupSize, total) + if err != nil { + return err + } + fmt.Printf("Created secret set %s with size %d B, group-size %d, total %d\n", secName, size, groupSize, total) + return nil + }, +} + +var secretDelCommand = cli.Command{ + Name: "delete", + ShortName: "del", + ArgsUsage: "NAME", + Usage: "Delete a secrets set", + Flags: []cli.Flag{ + cli.Float64Flag{ + Name: "qps", + Usage: "QPS for the Kubernetes client rate limiter to control secret deletion", + Value: 100, + }, + cli.IntFlag{ + Name: "burst", + Usage: "Burst for the Kubernetes client rate limiter to control secret deletion", + Value: 200, + }, + cli.IntFlag{ + Name: "group-size", + Usage: "Number of secrets to delete in parallel per batch", + Value: 30, + }, + }, + Action: func(cliCtx *cli.Context) error { + if cliCtx.NArg() != 1 { + return fmt.Errorf("required only one secrets set name") + } + secName := strings.TrimSpace(cliCtx.Args().Get(0)) + if len(secName) == 0 { + return fmt.Errorf("required non-empty secrets set name") + } + + namespace := cliCtx.GlobalString("namespace") + kubeCfgPath := cliCtx.GlobalString("kubeconfig") + qps := float32(cliCtx.Float64("qps")) + burst := cliCtx.Int("burst") + groupSize := cliCtx.Int("group-size") + if groupSize <= 0 { + return fmt.Errorf("group-size must be greater than 0") + } + labelSelector := fmt.Sprintf("app=%s,secName=%s", data.AppLabel, secName) + + clientset, err := data.NewClientsetWithRateLimiter(kubeCfgPath, qps, burst) + if err != nil { + return err + } + + err = deleteSecrets(clientset, labelSelector, namespace, groupSize) + if err != nil { + return err + } + + fmt.Printf("Deleted secrets set %s in %s namespace\n", secName, namespace) + return nil + }, +} + +var secretListCommand = cli.Command{ + Name: "list", + Usage: "List generated secrets. Lists all if no arguments are given; otherwise, provide secret set names separated by spaces.", + ArgsUsage: "[NAME ...]", + Action: func(cliCtx *cli.Context) error { + namespace := cliCtx.GlobalString("namespace") + kubeCfgPath := cliCtx.GlobalString("kubeconfig") + clientset, err := data.NewClientset(kubeCfgPath) + if err != nil { + return err + } + + const ( + minWidth = 1 + tabWidth = 12 + padding = 3 + padChar = ' ' + flags = 0 + ) + tw := tabwriter.NewWriter(os.Stdout, minWidth, tabWidth, padding, padChar, flags) + fmt.Fprintln(tw, "NAME\tSIZE\tGROUP_SIZE\tTOTAL\t") + + var labelSelector string + if cliCtx.NArg() == 0 { + labelSelector = fmt.Sprintf("app=%s", data.AppLabel) + } else { + args := cliCtx.Args() + namesStr := strings.Join(args, ",") + labelSelector = fmt.Sprintf("app=%s, secName in (%s)", data.AppLabel, namesStr) + } + + type secretSetInfo struct { + size int + total int + ownerID map[string]int // ownerID -> count of secrets in that group + } + secMap := make(map[string]*secretSetInfo) + err = listSecrets(clientset, labelSelector, namespace, defaultBatchSize, func(sec corev1.Secret) error { + name, ok := sec.Labels["secName"] + if !ok { + return fmt.Errorf("failed to find the secName of secret %s", sec.Name) + } + + info, ok := secMap[name] + if !ok { + info = &secretSetInfo{ + ownerID: make(map[string]int), + } + if d, exists := sec.Data["data"]; exists { + info.size = len(d) + } + secMap[name] = info + } + + info.total++ + + ownerID, ok := sec.Labels["ownerID"] + if !ok { + return fmt.Errorf("failed to find the ownerID of secret %s", sec.Name) + } + info.ownerID[ownerID]++ + return nil + }) + if err != nil { + return err + } + + for name, info := range secMap { + // Group size should be the max count across all ownerID groups + groupSize := 0 + for _, count := range info.ownerID { + if count > groupSize { + groupSize = count + } + } + fmt.Fprintf(tw, "%s\t%d\t%d\t%d\n", + name, + info.size, + groupSize, + info.total, + ) + } + return tw.Flush() + }, +} + +func checkSecretParams(size int, groupSize int, total int) error { + if size <= 0 { + return fmt.Errorf("size must be greater than 0") + } + if groupSize <= 0 { + return fmt.Errorf("group-size must be greater than 0") + } + if total <= 0 { + return fmt.Errorf("total amount must be greater than 0") + } + if groupSize > total { + return fmt.Errorf("group-size must be less than or equal to total") + } + return nil +} + +func createSecrets(clientset *kubernetes.Clientset, namespace string, secName string, size int, groupSize int, total int) error { + for i := 0; i < total; i += groupSize { + ownerID := i + g := new(errgroup.Group) + for j := i; j < i+groupSize && j < total; j++ { + idx := j + g.Go(func() error { + name := fmt.Sprintf("%s-sec-%s-%d", data.AppLabel, secName, idx) + + randData, err := data.RandBytes(size) + if err != nil { + return fmt.Errorf("failed to generate random data for secret %s: %v", name, err) + } + + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + "ownerID": strconv.Itoa(ownerID), + "app": data.AppLabel, + "secName": secName, + }, + }, + Type: corev1.SecretTypeOpaque, + Data: map[string][]byte{ + "data": randData, + }, + } + + _, err = clientset.CoreV1().Secrets(namespace).Create(context.TODO(), secret, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create secret %s: %v", name, err) + } + return nil + }) + } + if err := g.Wait(); err != nil { + return err + } + } + return nil +} + +func deleteSecrets(clientset *kubernetes.Clientset, labelSelector string, namespace string, groupSize int) error { + var names []string + err := listSecrets(clientset, labelSelector, namespace, defaultBatchSize, func(sec corev1.Secret) error { + names = append(names, sec.Name) + return nil + }) + if err != nil { + return err + } + + if len(names) == 0 { + fmt.Printf("No secrets found in namespace %s, nothing to delete\n", namespace) + return nil + } + + n := len(names) + for i := 0; i < n; i += groupSize { + g := new(errgroup.Group) + for j := i; j < i+groupSize && j < n; j++ { + idx := j + g.Go(func() error { + err := clientset.CoreV1().Secrets(namespace).Delete(context.TODO(), names[idx], metav1.DeleteOptions{}) + if err != nil && !errors.IsNotFound(err) { + return fmt.Errorf("failed to delete secret %s: %v", names[idx], err) + } + return nil + }) + } + if err := g.Wait(); err != nil { + return err + } + } + return nil +} + +func listSecrets(clientset *kubernetes.Clientset, labelSelector string, namespace string, limit int64, fn func(corev1.Secret) error) error { + if limit <= 0 { + limit = defaultBatchSize + } + var continueToken string + for { + secrets, err := clientset.CoreV1().Secrets(namespace).List(context.TODO(), metav1.ListOptions{ + LabelSelector: labelSelector, + Limit: limit, + Continue: continueToken, + }) + if err != nil { + return fmt.Errorf("failed to list secrets: %v", err) + } + + for _, sec := range secrets.Items { + if err := fn(sec); err != nil { + return err + } + } + + continueToken = secrets.Continue + if continueToken == "" { + break + } + } + return nil +} diff --git a/contrib/cmd/runkperf/commands/root.go b/contrib/cmd/runkperf/commands/root.go index 50796d1f..ab866802 100644 --- a/contrib/cmd/runkperf/commands/root.go +++ b/contrib/cmd/runkperf/commands/root.go @@ -13,6 +13,8 @@ import ( "github.com/Azure/kperf/contrib/cmd/runkperf/commands/data" "github.com/Azure/kperf/contrib/cmd/runkperf/commands/data/configmaps" "github.com/Azure/kperf/contrib/cmd/runkperf/commands/data/daemonsets" + "github.com/Azure/kperf/contrib/cmd/runkperf/commands/data/events" + "github.com/Azure/kperf/contrib/cmd/runkperf/commands/data/secrets" "github.com/Azure/kperf/contrib/cmd/runkperf/commands/warmup" "github.com/urfave/cli" @@ -23,6 +25,8 @@ func init() { data.RegisterSubcommands( configmaps.Command, daemonsets.Command, + events.Command, + secrets.Command, ) }