Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 91 additions & 8 deletions plugins/ptp_operator/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ func (pt *PtpClockThreshold) SafeClose() (justClosed bool) {
}()
select {
case <-pt.Close:
log.Debugf("cancelling holdover timer: channel already closed")
default:
log.Debugf("cancelling holdover timer")
close(pt.Close) // close any holdover go routines
}
return true // <=> justClosed = true; return
Expand Down Expand Up @@ -177,7 +179,7 @@ func (l *LinuxPTPConfigMapUpdate) SetAppliedNodeProfileJSON(appliedNodeProfileJS
func NewLinuxPTPConfUpdate() *LinuxPTPConfigMapUpdate {
ptpProfileUpdate := &LinuxPTPConfigMapUpdate{
lock: sync.RWMutex{},
UpdateCh: make(chan bool),
UpdateCh: make(chan bool, 1),
profilePath: DefaultProfilePath,
intervalUpdate: DefaultUpdateInterval,
EventThreshold: make(map[string]*PtpClockThreshold),
Expand Down Expand Up @@ -219,24 +221,39 @@ func (p *PtpProfile) GetInterface() (interfaces []*string) {

// DeletePTPThreshold ... delete threshold for profile
func (l *LinuxPTPConfigMapUpdate) DeletePTPThreshold(name string) {
l.lock.Lock()
defer l.lock.Unlock()
if t, found := l.EventThreshold[name]; found {
l.lock.Lock()
closeHoldover(t)
delete(l.EventThreshold, name)
l.lock.Unlock()
}
}

// DeleteAllPTPThreshold ... delete all threshold per config
func (l *LinuxPTPConfigMapUpdate) DeleteAllPTPThreshold() {
l.lock.Lock()
defer l.lock.Unlock()
for k, t := range l.EventThreshold {
l.lock.Lock()
closeHoldover(t)
delete(l.EventThreshold, k)
l.lock.Unlock()
}
}

// LookupEventThreshold returns the PtpClockThreshold for profileName under read
// lock, falling back to the first available entry when profileName is not found.
// Returns nil if EventThreshold is empty.
func (l *LinuxPTPConfigMapUpdate) LookupEventThreshold(profileName string) *PtpClockThreshold {
l.lock.RLock()
defer l.lock.RUnlock()
if t, found := l.EventThreshold[profileName]; found {
return t
}
for _, t := range l.EventThreshold {
return t
}
return nil
}

func closeHoldover(t *PtpClockThreshold) {
defer func() {
if err := recover(); err != nil {
Expand Down Expand Up @@ -366,7 +383,7 @@ func (l *LinuxPTPConfigMapUpdate) UpdateConfig(nodeProfilesJSON []byte) (bool, e
l.NodeProfiles[index].Interfaces = np.GetInterface()
l.NodeProfiles[index].PtpClockThreshold = np.PtpClockThreshold
}
l.UpdateCh <- true
l.notifyUpdate()
return true, nil
}

Expand All @@ -380,12 +397,21 @@ func (l *LinuxPTPConfigMapUpdate) UpdateConfig(nodeProfilesJSON []byte) (bool, e
log.Info("load profiles using old method")
l.appliedNodeProfileJSON = nodeProfilesJSON
l.NodeProfiles = nodeProfiles
l.UpdateCh <- true
l.notifyUpdate()
return true, nil
}
return false, fmt.Errorf("unable to load profile config")
}

// notifyUpdate signals config watchers without blocking when no receiver is ready.
func (l *LinuxPTPConfigMapUpdate) notifyUpdate() {
select {
case l.UpdateCh <- true:
default:
log.Debug("coalescing config update notification")
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// Try to load the multiple policy config
func tryToLoadConfig(nodeProfilesJSON []byte) ([]PtpProfile, bool) {
var ptpConfig []PtpProfile
Expand Down Expand Up @@ -438,7 +464,7 @@ func (l *LinuxPTPConfigMapUpdate) updatePtpConfig(nodeName string) (updated bool
if _, err := filesystem.Stat(nodeProfile); err != nil {
if os.IsNotExist(err) {
log.Infof("ptp profile %s doesn't exist for node: %v , error %s", nodeProfile, nodeName, err.Error())
l.UpdateCh <- true // if profile doesn't exist let the caller know
l.notifyUpdate() // if profile doesn't exist let the caller know
return
}
log.Errorf("error finding node profile %v: %v", nodeName, err)
Expand All @@ -461,6 +487,63 @@ func (l *LinuxPTPConfigMapUpdate) updatePtpConfig(nodeName string) (updated bool
return
}

// EnsureProcessOptions reloads the node profile file when needed and populates
// PtpProcessOpts if NodeProfiles are available but the process-options cache is empty.
// This handles startup ordering where the one-shot configmap watcher exits before the
// profile mount is ready, leaving PtpProcessOpts unpopulated when ptp4l configs arrive.
func (l *LinuxPTPConfigMapUpdate) EnsureProcessOptions(nodeName string) {
if len(l.NodeProfiles) == 0 {
l.updatePtpConfig(nodeName)
}

l.lock.Lock()
defer l.lock.Unlock()

if len(l.NodeProfiles) == 0 {
return
}
if len(l.PtpProcessOpts) > 0 {
return
}
l.UpdatePTPProcessOptions()
l.UpdatePTPThreshold()
l.UpdatePTPSetting()
}

// LookupPtpProcessOpts returns cached process options for a profile under read lock.
func (l *LinuxPTPConfigMapUpdate) LookupPtpProcessOpts(profileName string) *PtpProcessOpts {
if profileName == "" {
return nil
}
l.lock.RLock()
opts := l.PtpProcessOpts[profileName]
l.lock.RUnlock()
return opts
}

// LookupOrEnsurePtpProcessOpts returns process options for profileName, repopulating
// the cache under lock when it is still empty. updatePtpConfig runs outside the lock
// because it performs filesystem I/O.
func (l *LinuxPTPConfigMapUpdate) LookupOrEnsurePtpProcessOpts(nodeName, profileName string) *PtpProcessOpts {
if profileName == "" {
return nil
}

l.lock.RLock()
if opts, ok := l.PtpProcessOpts[profileName]; ok {
l.lock.RUnlock()
return opts
}
cacheEmpty := len(l.PtpProcessOpts) == 0
l.lock.RUnlock()

if cacheEmpty {
l.EnsureProcessOptions(nodeName)
}

return l.LookupPtpProcessOpts(profileName)
}

// GetDefaultThreshold ... get default threshold
func GetDefaultThreshold() PtpClockThreshold {
return PtpClockThreshold{
Expand Down
75 changes: 52 additions & 23 deletions plugins/ptp_operator/metrics/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,30 +66,20 @@ func NewPTPEventManager(resourcePrefix string, publisherTypes map[ptp.EventType]
// PtpThreshold ... return ptp threshold
// resetCh will reset any closed channel
func (p *PTPEventManager) PtpThreshold(profileName string, resetCh bool) ptpConfig.PtpClockThreshold {
if t, found := p.PtpConfigMapUpdates.EventThreshold[profileName]; found {
if resetCh {
t.Close = make(chan struct{}) // reset channel to new
}
return ptpConfig.PtpClockThreshold{
HoldOverTimeout: t.HoldOverTimeout,
MaxOffsetThreshold: t.MaxOffsetThreshold,
MinOffsetThreshold: t.MinOffsetThreshold,
Close: t.Close,
}
} else if len(p.PtpConfigMapUpdates.EventThreshold) > 0 { // if not found get the first item since one per config)
for _, t := range p.PtpConfigMapUpdates.EventThreshold {
if resetCh {
t.Close = make(chan struct{})
}
return ptpConfig.PtpClockThreshold{
HoldOverTimeout: t.HoldOverTimeout,
MaxOffsetThreshold: t.MaxOffsetThreshold,
MinOffsetThreshold: t.MinOffsetThreshold,
Close: t.Close,
}
}
t := p.PtpConfigMapUpdates.LookupEventThreshold(profileName)
if t == nil {
return ptpConfig.GetDefaultThreshold()
}
if resetCh {
t.SafeClose()
t.Close = make(chan struct{})
}
return ptpConfig.PtpClockThreshold{
HoldOverTimeout: t.HoldOverTimeout,
MaxOffsetThreshold: t.MaxOffsetThreshold,
MinOffsetThreshold: t.MinOffsetThreshold,
Close: t.Close,
}
return ptpConfig.GetDefaultThreshold()
}

// MockTest ... use for test only
Expand Down Expand Up @@ -472,6 +462,7 @@ func (p *PTPEventManager) GenPTPEvent(ptpProfileName string, oStats *stats.Stats
// previous state was HOLDOVER, now it is in LOCKED state, cancel any HOLDOVER
if isOffsetInRange(ptpOffset, threshold.MaxOffsetThreshold, threshold.MinOffsetThreshold) {
log.Infof("interface %s is in LOCKED state, cancel any holdover states", eventResourceName)
log.Debugf("cancelling holdover timer: profile=%s resource=%s", ptpProfileName, eventResourceName)
threshold.SafeClose()
log.Infof(" publishing event for ( profile %s) %s with last state %s and current clock state %s and offset %d for ( Max/Min Threshold %d/%d )",
ptpProfileName, eventResourceName, lastClockState, clockState, ptpOffset, threshold.MaxOffsetThreshold, threshold.MinOffsetThreshold)
Expand Down Expand Up @@ -540,6 +531,44 @@ func (p *PTPEventManager) NodeName() string {
return p.nodeName
}

// GetPtpProcessOpts returns process options for the given profile, resolving the
// profile from the registered config when needed and refreshing the node profile
// cache when PtpProcessOpts was never populated due to startup ordering.
func (p *PTPEventManager) GetPtpProcessOpts(profileName, configName string) *ptpConfig.PtpProcessOpts {
profileName = p.resolveProfileName(profileName, configName)
if !p.mock {
if opts := p.PtpConfigMapUpdates.LookupOrEnsurePtpProcessOpts(p.nodeName, profileName); opts != nil {
return opts
}
} else if opts := p.PtpConfigMapUpdates.LookupPtpProcessOpts(profileName); opts != nil {
return opts
}
if configName != "" {
if cfg := p.GetPTPConfig(types.ConfigName(configName)); cfg != nil && cfg.Profile != "" {
if !p.mock {
if opts := p.PtpConfigMapUpdates.LookupOrEnsurePtpProcessOpts(p.nodeName, cfg.Profile); opts != nil {
return opts
}
}
return p.PtpConfigMapUpdates.LookupPtpProcessOpts(cfg.Profile)
}
}
return nil
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

func (p *PTPEventManager) resolveProfileName(profileName, configName string) string {
if profileName != "" {
return profileName
}
if configName == "" {
return ""
}
if cfg := p.GetPTPConfig(types.ConfigName(configName)); cfg != nil {
return cfg.Profile
}
return ""
}

// GetMockEvent ...
func (p *PTPEventManager) GetMockEvent() []ptp.EventType {
return p.mockEvent
Expand Down
2 changes: 1 addition & 1 deletion plugins/ptp_operator/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func (p *PTPEventManager) processDownEvent(profileName, processName string, ptpS
}
}
if s, ok := ptpStats[ClockRealTime]; ok {
if t, ok2 := p.PtpConfigMapUpdates.PtpProcessOpts[profileName]; ok2 && t.Phc2SysEnabled() {
if t := p.PtpConfigMapUpdates.LookupPtpProcessOpts(profileName); t != nil && t.Phc2SysEnabled() {
p.GenPTPEvent(profileName, s, ClockRealTime, FreeRunOffsetValue, ptp.FREERUN, ptp.OsClockSyncStateChange)
}
}
Expand Down
71 changes: 47 additions & 24 deletions plugins/ptp_operator/metrics/ptp4lParse.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,20 +142,34 @@ func (p *PTPEventManager) ParsePTP4l(processName, configName, profileName, outpu
ptpStats[master].SetLastSyncState(syncState)
p.PublishEvent(syncState, ptpStats[master].LastOffset(), masterResource, ptp.PtpStateChange)
UpdateSyncStateMetrics(ptpStats[master].ProcessName(), alias, syncState)
if ptpOpts, ok := p.PtpConfigMapUpdates.PtpProcessOpts[profileName]; ok && ptpOpts != nil {
p.maybePublishOSClockSyncStateChangeEvent(ptpOpts, configName, profileName)
threshold := p.PtpThreshold(profileName, true)
if p.mock {
log.Infof("mock holdover is set to %s", ptpStats[MasterClockType].Alias())
} else {
go handleHoldOverState(p, ptpOpts, configName, profileName, threshold.HoldOverTimeout, ptpStats[MasterClockType].Alias(), threshold.Close)
}
}
ptpOpts := p.GetPtpProcessOpts(profileName, configName)
p.startHoldoverTimer(ptpOpts, configName, profileName, alias)
} else {
log.Warnf("holdover timer not started: master ProcessName %s != masterOffsetSource %s (profile=%s config=%s)",
ptpStats[master].ProcessName(), masterOffsetSource, profileName, configName)
}
}
}
}

func (p *PTPEventManager) startHoldoverTimer(
ptpOpts *ptpConfig.PtpProcessOpts, configName, profileName, alias string) {
if ptpOpts != nil {
p.maybePublishOSClockSyncStateChangeEvent(ptpOpts, configName, profileName)
} else {
log.Warnf("holdover timer: PtpProcessOpts missing for profile=%s config=%s (available=%v), continuing with default threshold",
profileName, configName, p.PtpConfigMapUpdates.PtpProcessOpts)
}
if p.mock {
log.Infof("holdover timer not started: mock mode (profile=%s config=%s alias=%s)", profileName, configName, alias)
return
}
threshold := p.PtpThreshold(profileName, true)
log.Infof("starting holdover timer: profile=%s config=%s alias=%s timeout=%ds",
profileName, configName, alias, threshold.HoldOverTimeout)
go handleHoldOverState(p, ptpOpts, configName, profileName, threshold.HoldOverTimeout, alias, threshold.Close)
}

func handleHoldOverState(ptpManager *PTPEventManager,
ptpOpts *ptpConfig.PtpProcessOpts, configName,
ptpProfileName string, holdoverTimeout int64,
Expand All @@ -168,25 +182,34 @@ func handleHoldOverState(ptpManager *PTPEventManager,
}()
select {
case <-c:
log.Infof("call received to close holderover timeout")
log.Infof("holdover timer cancelled: profile=%s config=%s alias=%s", ptpProfileName, configName, ptpIFace)
return
case <-time.After(time.Duration(holdoverTimeout) * time.Second):
log.Infof("holdover time expired for interface %s", ptpIFace)
log.Infof("holdover timer expired: profile=%s config=%s alias=%s timeout=%ds", ptpProfileName, configName, ptpIFace, holdoverTimeout)
ptpStats := ptpManager.GetStats(types.ConfigName(configName))
if mStats, found := ptpStats[master]; found {
if mStats.LastSyncState() == ptp.HOLDOVER { // if it was still in holdover while timing out then switch to FREERUN
log.Infof("HOLDOVER timeout after %d secs,setting clock state to FREERUN from HOLDOVER state for %s",
holdoverTimeout, master)
masterResource := fmt.Sprintf("%s/%s", mStats.Alias(), MasterClockType)
ptpStats[MasterClockType].SetLastSyncState(ptp.FREERUN)
ptpManager.PublishEvent(ptp.FREERUN, ptpStats[MasterClockType].LastOffset(), masterResource, ptp.PtpStateChange)
UpdateSyncStateMetrics(mStats.ProcessName(), mStats.Alias(), ptp.FREERUN)
// don't check of os clock sync state if phc2 not enabled
ptpManager.maybePublishOSClockSyncStateChangeEvent(ptpOpts, configName, ptpProfileName)
}
} else {
log.Errorf("failed to switch from holdover, could not find ptpStats for interface %s", ptpIFace)
mStats, found := ptpStats[master]
if !found {
log.Errorf("failed to switch from holdover, could not find ptpStats for config=%s profile=%s alias=%s", configName, ptpProfileName, ptpIFace)
return
}
if mStats.LastSyncState() != ptp.HOLDOVER {
log.Infof("holdover timer expired but state is %s (not HOLDOVER), no-op: profile=%s config=%s alias=%s",
mStats.LastSyncState(), ptpProfileName, configName, ptpIFace)
return
}
alias := mStats.Alias()
if alias == "" {
log.Errorf("failed to switch from holdover, empty alias for config=%s profile=%s", configName, ptpProfileName)
return
}
log.Infof("holdover expired, transitioning to FREERUN: profile=%s config=%s alias=%s timeout=%ds",
ptpProfileName, configName, alias, holdoverTimeout)
masterResource := fmt.Sprintf("%s/%s", alias, MasterClockType)
mStats.SetLastSyncState(ptp.FREERUN)
ptpManager.PublishEvent(ptp.FREERUN, mStats.LastOffset(), masterResource, ptp.PtpStateChange)
UpdateSyncStateMetrics(mStats.ProcessName(), alias, ptp.FREERUN)
// don't check of os clock sync state if phc2 not enabled
ptpManager.maybePublishOSClockSyncStateChangeEvent(ptpOpts, configName, ptpProfileName)
}
}

Expand Down
Loading
Loading