Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 200 additions & 9 deletions azureappconfiguration/azureappconfiguration.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,32 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"

"github.com/Azure/AppConfiguration-GoProvider/azureappconfiguration/internal/refresh"
"github.com/Azure/AppConfiguration-GoProvider/azureappconfiguration/internal/tracing"
"github.com/Azure/AppConfiguration-GoProvider/azureappconfiguration/internal/tree"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
decoder "github.com/go-viper/mapstructure/v2"
"golang.org/x/sync/errgroup"
)

// An AzureAppConfiguration is a configuration provider that stores and manages settings sourced from Azure App Configuration.
type AzureAppConfiguration struct {
keyValues map[string]any
kvSelectors []Selector
trimPrefixes []string
keyValues map[string]any
kvSelectors []Selector
trimPrefixes []string
watchedSettings []WatchedSetting

sentinelETags map[WatchedSetting]*azcore.ETag
kvRefreshTimer refresh.Condition
onRefreshSuccess []func()
tracingOptions tracing.Options

clientManager *configurationClientManager
resolver *keyVaultReferenceResolver

tracingOptions tracing.Options
refreshInProgress atomic.Bool
}

// Load initializes a new AzureAppConfiguration instance and loads the configuration data from
Expand All @@ -56,6 +65,10 @@ func Load(ctx context.Context, authentication AuthenticationOptions, options *Op
return nil, err
}

if err := verifyOptions(options); err != nil {
return nil, err
}

if options == nil {
options = &Options{}
}
Expand All @@ -77,9 +90,17 @@ func Load(ctx context.Context, authentication AuthenticationOptions, options *Op
credential: options.KeyVaultOptions.Credential,
}

if options.RefreshOptions.Enabled {
azappcfg.kvRefreshTimer = refresh.NewTimer(options.RefreshOptions.Interval)
azappcfg.watchedSettings = normalizedWatchedSettings(options.RefreshOptions.WatchedSettings)
azappcfg.sentinelETags = make(map[WatchedSetting]*azcore.ETag)
}

if err := azappcfg.load(ctx); err != nil {
return nil, err
}
// Set the initial load finished flag
azappcfg.tracingOptions.InitialLoadFinished = true

return azappcfg, nil
}
Expand Down Expand Up @@ -150,14 +171,107 @@ func (azappcfg *AzureAppConfiguration) GetBytes(options *ConstructionOptions) ([
return json.Marshal(azappcfg.constructHierarchicalMap(options.Separator))
}

// Refresh manually triggers a refresh of the configuration from Azure App Configuration.
// It checks if any watched settings have changed, and if so, reloads all configuration data.
//
// The refresh only occurs if:
// - Refresh has been configured with RefreshOptions when the client was created
// - The configured refresh interval has elapsed since the last refresh
// - No other refresh operation is currently in progress
//
// If the configuration has changed, any callback functions registered with OnRefreshSuccess will be executed.
//
// Parameters:
// - ctx: The context for the operation.
//
// Returns:
// - An error if refresh is not configured, or if the refresh operation fails
func (azappcfg *AzureAppConfiguration) Refresh(ctx context.Context) error {
if azappcfg.kvRefreshTimer == nil {
return fmt.Errorf("refresh is not configured")
}

// Try to set refreshInProgress to true, returning false if it was already true
if !azappcfg.refreshInProgress.CompareAndSwap(false, true) {
return nil // Another refresh is already in progress
}

// Reset the flag when we're done
defer azappcfg.refreshInProgress.Store(false)

// Check if it's time to perform a refresh based on the timer interval
if !azappcfg.kvRefreshTimer.ShouldRefresh() {
return nil
}

// Attempt to refresh and check if any values were actually updated
refreshed, err := azappcfg.refreshKeyValues(ctx, azappcfg.newKeyValueRefreshClient())
if err != nil {
return fmt.Errorf("failed to refresh configuration: %w", err)
}

// Only execute callbacks if actual changes were applied
if refreshed {
for _, callback := range azappcfg.onRefreshSuccess {
if callback != nil {
callback()
}
}
}

return nil
}

// OnRefreshSuccess registers a callback function that will be executed whenever the configuration
// is successfully refreshed and actual changes were detected.
//
// Multiple callback functions can be registered, and they will be executed in the order they were added.
// Callbacks are only executed when configuration values actually change. They run synchronously
// in the thread that initiated the refresh.
//
// Parameters:
// - callback: A function with no parameters that will be called after a successful refresh
func (azappcfg *AzureAppConfiguration) OnRefreshSuccess(callback func()) {
azappcfg.onRefreshSuccess = append(azappcfg.onRefreshSuccess, callback)
}

func (azappcfg *AzureAppConfiguration) load(ctx context.Context) error {
keyValuesClient := &selectorSettingsClient{
selectors: azappcfg.kvSelectors,
client: azappcfg.clientManager.staticClient.client,
tracingOptions: azappcfg.tracingOptions,
eg, egCtx := errgroup.WithContext(ctx)
eg.Go(func() error {
keyValuesClient := &selectorSettingsClient{
selectors: azappcfg.kvSelectors,
client: azappcfg.clientManager.staticClient.client,
tracingOptions: azappcfg.tracingOptions,
}
return azappcfg.loadKeyValues(egCtx, keyValuesClient)
})

if azappcfg.kvRefreshTimer != nil && len(azappcfg.watchedSettings) > 0 {
eg.Go(func() error {
watchedClient := &watchedSettingClient{
watchedSettings: azappcfg.watchedSettings,
client: azappcfg.clientManager.staticClient.client,
tracingOptions: azappcfg.tracingOptions,
}
return azappcfg.loadWatchedSettings(egCtx, watchedClient)
})
}

return eg.Wait()
}

func (azappcfg *AzureAppConfiguration) loadWatchedSettings(ctx context.Context, settingsClient settingsClient) error {
settingsResponse, err := settingsClient.getSettings(ctx)
if err != nil {
return err
}

return azappcfg.loadKeyValues(ctx, keyValuesClient)
// Store ETags for all watched settings
if settingsResponse != nil && settingsResponse.watchedETags != nil {
azappcfg.sentinelETags = settingsResponse.watchedETags
}

return nil
}

func (azappcfg *AzureAppConfiguration) loadKeyValues(ctx context.Context, settingsClient settingsClient) error {
Expand Down Expand Up @@ -246,6 +360,48 @@ func (azappcfg *AzureAppConfiguration) loadKeyValues(ctx context.Context, settin
return nil
}

// refreshKeyValues checks if any watched settings have changed and reloads configuration if needed
// Returns true if configuration was actually refreshed, false otherwise
func (azappcfg *AzureAppConfiguration) refreshKeyValues(ctx context.Context, refreshClient refreshClient) (bool, error) {
// Check if any ETags have changed
eTagChanged, err := refreshClient.monitor.checkIfETagChanged(ctx)
if err != nil {
return false, fmt.Errorf("failed to check if watched settings have changed: %w", err)
}

if !eTagChanged {
// No changes detected, reset timer and return
azappcfg.kvRefreshTimer.Reset()
return false, nil
}

// Use an errgroup to reload key values and watched settings concurrently
eg, egCtx := errgroup.WithContext(ctx)

// Reload key values in one goroutine
eg.Go(func() error {
settingsClient := refreshClient.loader
return azappcfg.loadKeyValues(egCtx, settingsClient)
})

if len(azappcfg.watchedSettings) > 0 {
eg.Go(func() error {
watchedClient := refreshClient.sentinels
return azappcfg.loadWatchedSettings(egCtx, watchedClient)
})
}

// Wait for all reloads to complete
if err := eg.Wait(); err != nil {
// Don't reset the timer if reload failed
return false, fmt.Errorf("failed to reload configuration: %w", err)
}

// Reset the timer only after successful refresh
azappcfg.kvRefreshTimer.Reset()
return true, nil
}

func (azappcfg *AzureAppConfiguration) trimPrefix(key string) string {
result := key
for _, prefix := range azappcfg.trimPrefixes {
Expand Down Expand Up @@ -324,3 +480,38 @@ func configureTracingOptions(options *Options) tracing.Options {

return tracingOption
}

func normalizedWatchedSettings(s []WatchedSetting) []WatchedSetting {
result := make([]WatchedSetting, len(s))
for i, setting := range s {
// Make a copy of the setting
normalizedSetting := setting
if normalizedSetting.Label == "" {
normalizedSetting.Label = defaultLabel
}

result[i] = normalizedSetting
}

return result
}

func (azappcfg *AzureAppConfiguration) newKeyValueRefreshClient() refreshClient {
return refreshClient{
loader: &selectorSettingsClient{
selectors: azappcfg.kvSelectors,
client: azappcfg.clientManager.staticClient.client,
tracingOptions: azappcfg.tracingOptions,
},
monitor: &watchedSettingClient{
eTags: azappcfg.sentinelETags,
client: azappcfg.clientManager.staticClient.client,
tracingOptions: azappcfg.tracingOptions,
},
sentinels: &watchedSettingClient{
watchedSettings: azappcfg.watchedSettings,
client: azappcfg.clientManager.staticClient.client,
tracingOptions: azappcfg.tracingOptions,
},
}
}
8 changes: 8 additions & 0 deletions azureappconfiguration/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package azureappconfiguration

import "time"

// Configuration client constants
const (
endpointKey string = "Endpoint"
Expand All @@ -18,3 +20,9 @@ const (
secretReferenceContentType string = "application/vnd.microsoft.appconfig.keyvaultref+json;charset=utf-8"
featureFlagContentType string = "application/vnd.microsoft.appconfig.ff+json;charset=utf-8"
)

// Refresh interval constants
const (
// minimalRefreshInterval is the minimum allowed refresh interval for key-value settings
minimalRefreshInterval time.Duration = time.Second
)
46 changes: 46 additions & 0 deletions azureappconfiguration/internal/refresh/refresh.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

package refresh

import "time"

// Timer manages the timing for refresh operations
type Timer struct {
interval time.Duration // How often refreshes should occur
nextRefreshTime time.Time // When the next refresh should occur
}

// Condition interface defines the methods a refresh timer should implement
type Condition interface {
ShouldRefresh() bool
Reset()
}

const (
DefaultRefreshInterval time.Duration = 30 * time.Second
)

// NewTimer creates a new refresh timer with the specified interval
// If interval is zero or negative, it falls back to the DefaultRefreshInterval
func NewTimer(interval time.Duration) *Timer {
// Use default interval if not specified or invalid
if interval <= 0 {
interval = DefaultRefreshInterval
}

return &Timer{
interval: interval,
nextRefreshTime: time.Now().Add(interval),
}
}

// ShouldRefresh checks whether it's time for a refresh
func (rt *Timer) ShouldRefresh() bool {
return !time.Now().Before(rt.nextRefreshTime)
}

// Reset resets the timer for the next refresh cycle
func (rt *Timer) Reset() {
rt.nextRefreshTime = time.Now().Add(rt.interval)
}
14 changes: 6 additions & 8 deletions azureappconfiguration/internal/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

type RequestType string

type RequestTracingKey string
type HostType string

const (
Expand All @@ -32,7 +32,6 @@ const (
// Documentation : https://docs.microsoft.com/en-us/azure/service-fabric/service-fabric-environment-variables-reference
EnvVarServiceFabric = "Fabric_NodeName"

RequestTracingKey = "Tracing"
RequestTypeKey = "RequestType"
HostTypeKey = "Host"
KeyVaultConfiguredTag = "UsesKeyVault"
Expand All @@ -50,6 +49,7 @@ const (

type Options struct {
Enabled bool
InitialLoadFinished bool
Host HostType
KeyVaultConfigured bool
UseAIConfiguration bool
Expand All @@ -75,12 +75,10 @@ func CreateCorrelationContextHeader(ctx context.Context, options Options) http.H
header := http.Header{}
output := make([]string, 0)

if tracing := ctx.Value(RequestTracingKey); tracing != nil {
if tracing.(RequestType) == RequestTypeStartUp {
output = append(output, RequestTypeKey+"="+string(RequestTypeStartUp))
} else if tracing.(RequestType) == RequestTypeWatch {
output = append(output, RequestTypeKey+"="+string(RequestTypeWatch))
}
if !options.InitialLoadFinished {
output = append(output, RequestTypeKey+"="+string(RequestTypeStartUp))
} else {
output = append(output, RequestTypeKey+"="+string(RequestTypeWatch))
}

if options.Host != "" {
Expand Down
Loading
Loading