Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
197 changes: 178 additions & 19 deletions cmd/machine-api-operator/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,50 @@ package main

import (
"context"
"crypto/tls"
"errors"
"flag"
"fmt"
"net/http"
"os"
"reflect"
"strconv"
"sync"
"sync/atomic"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
coreclientsetv1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/utils/clock"

osconfigv1 "github.com/openshift/api/config/v1"
osclientset "github.com/openshift/client-go/config/clientset/versioned"
utiltls "github.com/openshift/controller-runtime-common/pkg/tls"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/machine-api-operator/pkg/metrics"
maometrics "github.com/openshift/machine-api-operator/pkg/metrics"
"github.com/openshift/machine-api-operator/pkg/operator"
"github.com/openshift/machine-api-operator/pkg/util"
"github.com/openshift/machine-api-operator/pkg/version"
"sigs.k8s.io/controller-runtime/pkg/client"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
)

const (
// defaultMetricsPort is the default port to expose metrics.
defaultMetricsPort = 8080
defaultMetricsPort = 8443
metricsCertDir = "/etc/tls/private"
metricsCertFile = "tls.crt"
metricsKeyFile = "tls.key"
)

var (
Expand Down Expand Up @@ -82,33 +94,50 @@ func runStartCmd(cmd *cobra.Command, args []string) error {
return fmt.Errorf("error creating clients: %v", err)
}
stopCh := make(chan struct{})
leaderElectionCtx, leaderElectionCancel := context.WithCancel(context.Background())
var shutdownOnce sync.Once
var shuttingDown atomic.Bool
shutdown := func() {
shutdownOnce.Do(func() {
shuttingDown.Store(true)
close(stopCh)
leaderElectionCancel()
})
}

le := util.GetLeaderElectionConfig(cb.config, osconfigv1.LeaderElection{})

leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
leaderelection.RunOrDie(leaderElectionCtx, leaderelection.LeaderElectionConfig{
Lock: CreateResourceLock(cb, componentNamespace, componentName),
RenewDeadline: le.RenewDeadline.Duration,
RetryPeriod: le.RetryPeriod.Duration,
LeaseDuration: le.LeaseDuration.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
ctrlCtx := CreateControllerContext(cb, stopCh, componentNamespace)
if err := setupTLSProfileWatcher(ctrlCtx, shutdown); err != nil {
klog.Fatalf("Unable to set up TLS profile watcher: %v", err)
}
startControllersOrDie(ctrlCtx)
ctrlCtx.KubeNamespacedInformerFactory.Start(ctrlCtx.Stop)
ctrlCtx.ConfigInformerFactory.Start(ctrlCtx.Stop)
initMachineAPIInformers(ctrlCtx)
startMetricsCollectionAndServer(ctrlCtx)
close(ctrlCtx.InformersStarted)

select {}
<-stopCh
},
OnStoppedLeading: func() {
if shuttingDown.Load() {
klog.Info("Leader election stopped due to shutdown")
return
}
klog.Fatalf("Leader election lost")
},
},
ReleaseOnCancel: true,
})
panic("unreachable")
return nil
}

func initMachineAPIInformers(ctx *ControllerContext) {
Expand Down Expand Up @@ -182,11 +211,11 @@ func startControllersOrDie(ctx *ControllerContext) {
func startMetricsCollectionAndServer(ctx *ControllerContext) {
machineInformer := ctx.MachineInformerFactory.Machine().V1beta1().Machines()
machinesetInformer := ctx.MachineInformerFactory.Machine().V1beta1().MachineSets()
machineMetricsCollector := metrics.NewMachineCollector(
machineMetricsCollector := maometrics.NewMachineCollector(
machineInformer,
machinesetInformer,
componentNamespace)
prometheus.MustRegister(machineMetricsCollector)
ctrlmetrics.Registry.MustRegister(machineMetricsCollector)
metricsPort := defaultMetricsPort
if port, ok := os.LookupEnv("METRICS_PORT"); ok {
v, err := strconv.Atoi(port)
Expand All @@ -195,17 +224,147 @@ func startMetricsCollectionAndServer(ctx *ControllerContext) {
}
metricsPort = v
}
klog.V(4).Info("Starting server to serve prometheus metrics")
go startHTTPMetricServer(fmt.Sprintf("localhost:%d", metricsPort))
klog.V(4).Info("Starting secure metrics server")
tlsOpts, err := metricsTLSOptions(ctx)
if err != nil {
klog.Fatalf("Unable to configure metrics TLS: %v", err)
}
metricsServer, err := newSecureMetricsServer(
ctx,
fmt.Sprintf(":%d", metricsPort),
tlsOpts,
)
if err != nil {
klog.Fatalf("Unable to initialize secure metrics server: %v", err)
}

metricsServerCtx, cancel := context.WithCancel(context.Background())
go func() {
<-ctx.Stop
cancel()
}()

go func() {
if err := metricsServer.Start(metricsServerCtx); err != nil {
if errors.Is(err, context.Canceled) {
klog.V(2).Info("Secure metrics server shutdown complete")
return
}
klog.Fatalf("Unable to start secure metrics server: %v", err)
}
}()
}

func metricsTLSOptions(ctx *ControllerContext) ([]func(*tls.Config), error) {
scheme := runtime.NewScheme()
if err := osconfigv1.Install(scheme); err != nil {
return nil, fmt.Errorf("unable to add config.openshift.io scheme: %w", err)
}

k8sClient, err := client.New(ctx.ClientBuilder.config, client.Options{Scheme: scheme})
if err != nil {
return nil, fmt.Errorf("unable to create Kubernetes client: %w", err)
}

tlsSecurityProfileSpec, err := utiltls.FetchAPIServerTLSProfile(context.Background(), k8sClient)
if err != nil {
return nil, fmt.Errorf("unable to get TLS profile from API server: %w", err)
}

tlsConfigFn, unsupportedCiphers := utiltls.NewTLSConfigFromProfile(tlsSecurityProfileSpec)
if len(unsupportedCiphers) > 0 {
klog.Infof("TLS configuration contains unsupported ciphers that will be ignored: %v", unsupportedCiphers)
}

return []func(*tls.Config){tlsConfigFn}, nil
}

func startHTTPMetricServer(metricsPort string) {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
func newSecureMetricsServer(ctx *ControllerContext, metricsAddr string, tlsOpts []func(*tls.Config)) (metricsserver.Server, error) {
httpClient, err := rest.HTTPClientFor(ctx.ClientBuilder.config)
if err != nil {
return nil, fmt.Errorf("unable to create HTTP client for metrics authn/authz: %w", err)
}

return metricsserver.NewServer(metricsserver.Options{
BindAddress: metricsAddr,
SecureServing: true,
FilterProvider: filters.WithAuthenticationAndAuthorization,
CertDir: metricsCertDir,
CertName: metricsCertFile,
KeyName: metricsKeyFile,
TLSOpts: tlsOpts,
}, ctx.ClientBuilder.config, httpClient)
}

server := &http.Server{
Addr: metricsPort,
Handler: mux,
func setupTLSProfileWatcher(ctx *ControllerContext, shutdown func()) error {
configClient := ctx.ClientBuilder.OpenshiftClientOrDie("tls-profile-watcher")
initialProfile, err := fetchAPIServerTLSProfileSpec(context.Background(), configClient)
if err != nil {
return err
}
klog.Fatal(server.ListenAndServe())

apiServerInformer := ctx.ConfigInformerFactory.Config().V1().APIServers().Informer()
_, err = apiServerInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
handleTLSProfileEvent(obj, &initialProfile, shutdown)
},
UpdateFunc: func(_, newObj interface{}) {
handleTLSProfileEvent(newObj, &initialProfile, shutdown)
},
DeleteFunc: func(obj interface{}) {
handleTLSProfileEvent(obj, &initialProfile, shutdown)
},
Comment on lines +299 to +316
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Verify control flow: read the key sections mentioned in the review
echo "=== Section 1: Lines 96-140 (startup context) ==="
sed -n '96,140p' cmd/machine-api-operator/start.go

echo -e "\n=== Section 2: Lines 143-153 (cache sync with fatal) ==="
sed -n '143,153p' cmd/machine-api-operator/start.go

echo -e "\n=== Section 3: Lines 299-370 (setupTLSProfileWatcher) ==="
sed -n '299,370p' cmd/machine-api-operator/start.go

Repository: openshift/machine-api-operator

Length of output: 4655


Prevent fatal crash when TLS watcher triggers shutdown before cache sync completes.

shutdown() can be called from APIServer events after the TLS watcher is registered but before initMachineAPIInformers() cache sync finishes. This closes stopCh, causing cache.WaitForCacheSync() to return false, which unconditionally triggers klog.Fatal() at line 150—converting an intended graceful restart into a hard crash.

Suggested hardening
func initMachineAPIInformers(ctx *ControllerContext) {
	mInformer := ctx.MachineInformerFactory.Machine().V1beta1().Machines().Informer()
	msInformer := ctx.MachineInformerFactory.Machine().V1beta1().MachineSets().Informer()
	ctx.MachineInformerFactory.Start(ctx.Stop)
	if !cache.WaitForCacheSync(ctx.Stop,
		mInformer.HasSynced,
		msInformer.HasSynced) {
+		select {
+		case <-ctx.Stop:
+			klog.V(2).Info("Skipping Machine API informer sync due to shutdown")
+			return
+		default:
+			klog.Fatal("Failed to sync caches for Machine api informers")
+		}
	}
	klog.Info("Synced up machine api informer caches")
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/machine-api-operator/start.go` around lines 299 - 316, The code must
avoid converting a shutdown-triggered cache sync failure into a hard crash: in
the initMachineAPIInformers() path where cache.WaitForCacheSync(...) is
currently followed by klog.Fatal on false, change the logic to detect a shutdown
caused by setupTLSProfileWatcher/handleTLSProfileEvent calling shutdown() (i.e.,
the stop channel being closed or context cancelled) and return an error or nil
to allow graceful restart instead of calling klog.Fatal; update
initMachineAPIInformers() to check the stop channel or context before deciding
to fatal, and ensure setupTLSProfileWatcher/handleTLSProfileEvent continue to
call shutdown() as before.

})
if err != nil {
return fmt.Errorf("failed to add APIServer event handler: %w", err)
}

return nil
}

func fetchAPIServerTLSProfileSpec(ctx context.Context, configClient osclientset.Interface) (osconfigv1.TLSProfileSpec, error) {
apiServer, err := configClient.ConfigV1().APIServers().Get(ctx, utiltls.APIServerName, metav1.GetOptions{})
if err != nil {
return osconfigv1.TLSProfileSpec{}, fmt.Errorf("failed to get APIServer %q: %w", utiltls.APIServerName, err)
}

profile, err := utiltls.GetTLSProfileSpec(apiServer.Spec.TLSSecurityProfile)
if err != nil {
return osconfigv1.TLSProfileSpec{}, fmt.Errorf("failed to get TLS profile from APIServer %q: %w", utiltls.APIServerName, err)
}

return profile, nil
}

func handleTLSProfileEvent(obj interface{}, initialProfile *osconfigv1.TLSProfileSpec, shutdown func()) {
apiServer, ok := obj.(*osconfigv1.APIServer)
if !ok {
return
}
if apiServer.Name != utiltls.APIServerName {
return
}

currentProfile, err := utiltls.GetTLSProfileSpec(apiServer.Spec.TLSSecurityProfile)
if err != nil {
klog.Errorf("Failed to get TLS profile from APIServer %q: %v", apiServer.Name, err)
return
}

if reflect.DeepEqual(*initialProfile, currentProfile) {
klog.V(2).Info("TLS security profile unchanged")
return
}

klog.Infof("TLS security profile has changed, initiating a shutdown to pick up the new configuration: initialMinTLSVersion=%s currentMinTLSVersion=%s initialCiphers=%v currentCiphers=%v",
initialProfile.MinTLSVersion,
currentProfile.MinTLSVersion,
initialProfile.Ciphers,
currentProfile.Ciphers,
)

// Persist the new profile for future change detection.
*initialProfile = currentProfile

shutdown()
}
21 changes: 21 additions & 0 deletions cmd/machineset/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"crypto/tls"
"flag"
"fmt"
"log"
Expand All @@ -29,6 +30,7 @@ import (
osconfigv1 "github.com/openshift/api/config/v1"
apifeatures "github.com/openshift/api/features"
machinev1 "github.com/openshift/api/machine/v1beta1"
utiltls "github.com/openshift/controller-runtime-common/pkg/tls"
mapiwebhooks "github.com/openshift/machine-api-operator/pkg/webhooks"

"k8s.io/apiserver/pkg/util/feature"
Expand Down Expand Up @@ -85,6 +87,12 @@ func main() {
webhookCertdir := flag.String("webhook-cert-dir", defaultWebhookCertdir,
"Webhook cert dir, only used when webhook-enabled is true.")

tlsCipherSuites := flag.String("tls-cipher-suites", "",
"Comma-separated list of TLS cipher suites.")

tlsMinVersion := flag.String("tls-min-version", "",
"Minimum TLS version supported.")

healthAddr := flag.String(
"health-addr",
":9441",
Expand Down Expand Up @@ -159,9 +167,22 @@ func main() {
}

if *webhookEnabled {
tlsProfile := osconfigv1.TLSProfileSpec{
MinTLSVersion: osconfigv1.TLSProtocolVersion(*tlsMinVersion),
}
if *tlsCipherSuites != "" {
tlsProfile.Ciphers = strings.Split(*tlsCipherSuites, ",")
}

tlsOpts, unsupportedCiphers := utiltls.NewTLSConfigFromProfile(tlsProfile)
if len(unsupportedCiphers) > 0 {
klog.Infof("TLS configuration contains unsupported ciphers that will be ignored: %v", unsupportedCiphers)
}

opts.WebhookServer = webhook.NewServer(webhook.Options{
Port: *webhookPort,
CertDir: *webhookCertdir,
TLSOpts: []func(*tls.Config){tlsOpts},
})
}

Expand Down
Loading