diff --git a/cmd/main.go b/cmd/main.go index 41b22f1d..9c6ef782 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -418,7 +418,9 @@ func ProcessInChannel(wg *sync.WaitGroup, scConfig *common.SCConfiguration) { out.Status = channel.SUCCESS } } - scConfig.EventOutCh <- &out + if !common.SendToChannel(scConfig.EventOutCh, &out) { + log.Warningf("EventOutCh full, dropping ack for %s", d.Address) + } } else if d.Type == channel.STATUS && d.Status == channel.NEW { log.Warnf("event disabled,no action taken(can't send to a destination): logging new status check %v\n", d) out := channel.DataChan{ diff --git a/cmd/main_test.go b/cmd/main_test.go index 4a6928bb..e96723f5 100644 --- a/cmd/main_test.go +++ b/cmd/main_test.go @@ -26,16 +26,30 @@ var ( ) func storeCleanUp() { - _ = scConfig.PubSubAPI.DeleteAllPublishers() - _ = scConfig.PubSubAPI.DeleteAllSubscriptions() + if scConfig != nil && scConfig.PubSubAPI != nil { + log.Info("deleting all publishers") + _ = scConfig.PubSubAPI.DeleteAllPublishers() + } + if scConfig != nil && scConfig.SubscriberAPI != nil { + log.Info("deleting all subscription") + _, _ = scConfig.SubscriberAPI.DeleteAllSubscriptions() + } } func TestSidecar_MainWithHTTP(t *testing.T) { apiPort = 8990 - defer storeCleanUp() + + // Create a unique temporary directory for this test run to avoid conflicts + tempDir, err := os.MkdirTemp("", "sidecar-test-*") + assert.NoError(t, err) + defer func() { + storeCleanUp() + os.RemoveAll(tempDir) // Clean up temp directory + }() + wg := &sync.WaitGroup{} pl := plugins.Handler{Path: "../plugins"} - var storePath = "." + var storePath = tempDir if sPath, ok := os.LookupEnv("STORE_PATH"); ok && sPath != "" { storePath = sPath } @@ -55,10 +69,14 @@ func TestSidecar_MainWithHTTP(t *testing.T) { Err: nil, }, } + + // Clean up any existing state before starting the test + storeCleanUp() + log.Infof("Configuration set to %#v", scConfig) //start rest service - err := common.StartPubSubService(scConfig) + err = common.StartPubSubService(scConfig) assert.Nil(t, err) // imitate main process diff --git a/pkg/common/common.go b/pkg/common/common.go index 8ded3e49..4f88b247 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -46,6 +46,20 @@ import ( log "github.com/sirupsen/logrus" ) +const sendTimeout = 5 * time.Second + +// SendToChannel sends data to a channel with a timeout. +// Returns true if the send succeeded, false if the channel was full for the +// entire timeout duration. +func SendToChannel(ch chan *channel.DataChan, d *channel.DataChan) bool { + select { + case ch <- d: + return true + case <-time.After(sendTimeout): + return false + } +} + // TransportType defines transport type supported type TransportType int @@ -324,28 +338,39 @@ func PublishEvent(scConfig *SCConfiguration, e ceevent.Event) error { func PublishEventViaAPI(scConfig *SCConfiguration, cneEvent ceevent.Event, resourceAddress string) error { if ceEvent, err := GetPublishingCloudEvent(scConfig, cneEvent); err == nil { if IsV1Api(scConfig.APIVersion) { - scConfig.EventInCh <- &channel.DataChan{ + d := &channel.DataChan{ Type: channel.EVENT, Status: channel.NEW, Data: ceEvent, Address: ceEvent.Source(), // this is the publishing address ClientID: scConfig.ClientID(), } + if SendToChannel(scConfig.EventInCh, d) { + log.Debugf("event source %s sent to queue to process", ceEvent.Source()) + log.Debugf("event sent %s", cneEvent.JSONString()) + localmetrics.UpdateEventPublishedCount(ceEvent.Source(), localmetrics.SUCCESS, 1) + } else { + log.Warningf("EventInCh full for %s, dropping event for %s", sendTimeout, ceEvent.Source()) + localmetrics.UpdateEventPublishedCount(ceEvent.Source(), localmetrics.FAIL, 1) + } } else { // use EventOutCh instead of EventInCh to bypass http transport - scConfig.EventOutCh <- &channel.DataChan{ + d := &channel.DataChan{ Type: channel.EVENT, Status: channel.NEW, Data: ceEvent, Address: resourceAddress, // this is the publishing address ClientID: scConfig.ClientID(), } + if SendToChannel(scConfig.EventOutCh, d) { + log.Debugf("event source %s sent to queue to process", ceEvent.Source()) + log.Debugf("event sent %s", cneEvent.JSONString()) + localmetrics.UpdateEventPublishedCount(ceEvent.Source(), localmetrics.SUCCESS, 1) + } else { + log.Warningf("EventOutCh full for %s, dropping event for %s", sendTimeout, resourceAddress) + localmetrics.UpdateEventPublishedCount(ceEvent.Source(), localmetrics.FAIL, 1) + } } - - log.Debugf("event source %s sent to queue to process", ceEvent.Source()) - log.Debugf("event sent %s", cneEvent.JSONString()) - - localmetrics.UpdateEventPublishedCount(ceEvent.Source(), localmetrics.SUCCESS, 1) } return nil } diff --git a/pkg/common/common_test.go b/pkg/common/common_test.go index 2e5785bf..12c0590d 100644 --- a/pkg/common/common_test.go +++ b/pkg/common/common_test.go @@ -17,12 +17,14 @@ package common_test import ( "net" "testing" + "time" + "github.com/redhat-cne/cloud-event-proxy/pkg/common" + "github.com/redhat-cne/sdk-go/pkg/channel" + ceevent "github.com/redhat-cne/sdk-go/pkg/event" "github.com/redhat-cne/sdk-go/pkg/types" - + v1pubsub "github.com/redhat-cne/sdk-go/v1/pubsub" log "github.com/sirupsen/logrus" - - "github.com/redhat-cne/cloud-event-proxy/pkg/common" "github.com/stretchr/testify/assert" ) @@ -61,3 +63,42 @@ func TestTransportHost_ParseTransportHost(t *testing.T) { } } } + +func TestPublishEventViaAPI_NonBlockingWhenChannelFull(t *testing.T) { + // Create a channel with buffer size 1 and fill it + eventOutCh := make(chan *channel.DataChan, 1) + eventOutCh <- &channel.DataChan{} // fill the buffer + + pubSubAPI := v1pubsub.GetAPIInstance("/tmp/test-store") + pub, _ := pubSubAPI.CreatePublisher(v1pubsub.NewPubSub( + types.ParseURI("http://localhost/dummy"), + "/test/resource", + "1.0", + )) + + scConfig := &common.SCConfiguration{ + EventOutCh: eventOutCh, + PubSubAPI: pubSubAPI, + } + + // Create event with matching publisher ID + event := ceevent.Event{ID: pub.ID} + + // PublishEventViaAPI should return after the 5s timeout (not block forever) + // even though the channel is full + done := make(chan struct{}) + go func() { + _ = common.PublishEventViaAPI(scConfig, event, "/test/resource") + close(done) + }() + + select { + case <-done: + // success — returned after timeout, did not block forever + case <-time.After(10 * time.Second): + t.Fatal("PublishEventViaAPI blocked on full EventOutCh — should return after 5s timeout") + } + + // Channel should still have exactly 1 item (the original, not the new one) + assert.Equal(t, 1, len(eventOutCh)) +} diff --git a/plugins/ptp_operator/metrics/filesystem.go b/plugins/ptp_operator/metrics/filesystem.go new file mode 100644 index 00000000..59a455f9 --- /dev/null +++ b/plugins/ptp_operator/metrics/filesystem.go @@ -0,0 +1,31 @@ +package metrics + +import ( + "os" +) + +type WFiles interface { + WriteFile(name string, data []byte, perm os.FileMode) error +} + +type OSFileSystem struct { +} + +func (f OSFileSystem) WriteFile(name string, data []byte, perm os.FileMode) error { + return os.WriteFile(name, data, perm) +} + +type MockFileSystem struct { + WriteCount int +} + +func (m *MockFileSystem) WriteFile(name string, data []byte, perm os.FileMode) error { + m.WriteCount += 1 + return nil +} + +func (m *MockFileSystem) Clear() { + m.WriteCount = 0 +} + +var Filesystem WFiles = OSFileSystem{} diff --git a/plugins/ptp_operator/metrics/manager.go b/plugins/ptp_operator/metrics/manager.go index 2048a87e..122954de 100644 --- a/plugins/ptp_operator/metrics/manager.go +++ b/plugins/ptp_operator/metrics/manager.go @@ -1,11 +1,15 @@ package metrics import ( + "encoding/json" "fmt" "net/http" + "os" "path" + "path/filepath" "strings" "sync" + "time" cloudevents "github.com/cloudevents/sdk-go/v2" @@ -34,6 +38,7 @@ type PTPEventManager struct { // Ptp4lConfigInterfaces holds interfaces and its roles, after reading from ptp4l config files Ptp4lConfigInterfaces map[types.ConfigName]*ptp4lconf.PTP4lConfig lastOverallSyncState ptp.SyncState + processStates map[types.ConfigName]map[string]bool } // NewPTPEventManager to manage events and metrics @@ -48,6 +53,7 @@ func NewPTPEventManager(resourcePrefix string, publisherTypes map[ptp.EventType] Stats: map[types.ConfigName]stats.PTPStats{}, Ptp4lConfigInterfaces: make(map[types.ConfigName]*ptp4lconf.PTP4lConfig), mock: false, + processStates: make(map[types.ConfigName]map[string]bool), } // attach ptp config updates ptpEventManager.PtpConfigMapUpdates = ptpConfig.NewLinuxPTPConfUpdate() @@ -111,6 +117,16 @@ func (p *PTPEventManager) AddPTPConfig(fileName types.ConfigName, ptpCfg *ptp4lc p.lock.Unlock() } +func (p *PTPEventManager) UpdateSyncState(syncState ptp.SyncState) { + p.lock.Lock() + defer p.lock.Unlock() + p.lastOverallSyncState = syncState + err := p.saveToStore() + if err != nil { + log.Errorf("failed to save metrics to store: %s", err) + } +} + // GetPTPConfig ... Add PtpConfigUpdate obj func (p *PTPEventManager) GetPTPConfig(configName types.ConfigName) *ptp4lconf.PTP4lConfig { if _, ok := p.Ptp4lConfigInterfaces[configName]; ok && p.Ptp4lConfigInterfaces[configName] != nil { @@ -343,30 +359,34 @@ func (p *PTPEventManager) PublishEvent(state ptp.SyncState, ptpOffset int64, sou if state == "" { return } + // Handle mock mode if p.mock { p.mockEvent = []ptp.EventType{eventType} if eventType == ptp.PtpStateChange || eventType == ptp.OsClockSyncStateChange { p.mockEvent = append(p.mockEvent, ptp.SyncStateChange) } log.Infof("PublishEvent state=%s, ptpOffset=%d, source=%s, eventType=%s", state, ptpOffset, source, eventType) - return + } else { + // /cluster/xyz/ptp/CLOCK_REALTIME this is not address the event is published to + data := p.GetPTPEventsData(state, ptpOffset, source, eventType) + resourceAddress := path.Join(p.resourcePrefix, p.nodeName, string(p.publisherTypes[eventType].Resource)) + p.publish(*data, resourceAddress, eventType) } - // /cluster/xyz/ptp/CLOCK_REALTIME this is not address the event is published to - data := p.GetPTPEventsData(state, ptpOffset, source, eventType) - resourceAddress := path.Join(p.resourcePrefix, p.nodeName, string(p.publisherTypes[eventType].Resource)) - p.publish(*data, resourceAddress, eventType) - // publish the event again as overall sync state + // Common logic for both mock and non-mock modes: handle node state aggregation // SyncStateChange is the overall sync state including PtpStateChange and OsClockSyncStateChange if eventType == ptp.PtpStateChange || eventType == ptp.OsClockSyncStateChange { nodeState := p.GetNodeSyncState(state) - if state != p.lastOverallSyncState { - eventType = ptp.SyncStateChange - source = string(p.publisherTypes[eventType].Resource) - data = p.GetPTPEventsData(state, ptpOffset, source, eventType) - resourceAddress = path.Join(p.resourcePrefix, p.nodeName, source) - p.publish(*data, resourceAddress, eventType) - p.lastOverallSyncState = nodeState + if nodeState != p.lastOverallSyncState { + if !p.mock { + // In non-mock mode, also publish the SyncStateChange event + eventType = ptp.SyncStateChange + source = string(p.publisherTypes[eventType].Resource) + data := p.GetPTPEventsData(nodeState, ptpOffset, source, eventType) + resourceAddress := path.Join(p.resourcePrefix, p.nodeName, source) + p.publish(*data, resourceAddress, eventType) + } + p.UpdateSyncState(nodeState) } } } @@ -593,7 +613,7 @@ func (p *PTPEventManager) GetNodeSyncState(currentState ptp.SyncState) ptp.SyncS if s != ptp.FREERUN && s != ptp.HOLDOVER && s != ptp.LOCKED { continue } - finalState = OverallState(s, finalState) + finalState = OverallState(finalState, s) found = true } } @@ -631,13 +651,110 @@ func OverallState(current, updated ptp.SyncState) ptp.SyncState { const logsEndpoint = "http://localhost:8081/emit-logs" -// TriggerLogs makes an HTTP request to the linuxptp-daemon to re-emit -// all metrics logs so that cloud-event-proxy can repopulate its state. +type SavedMetrics struct { + Timestamp int64 `json:"timestamp"` + LastClockState ptp.SyncState `json:"last_clock_state"` + PortRoles map[types.ConfigName]map[string]types.PtpPortRole `json:"portRoles"` + ProcessStates map[types.ConfigName]map[string]bool `json:"process_states"` +} + +const metricFilename = "metrics.json" + +func (p *PTPEventManager) LoadFromStore(config *common.SCConfiguration) (int64, error) { + contents, err := os.ReadFile(filepath.Join(config.StorePath, metricFilename)) + if os.IsNotExist(err) { + return 0, nil + } + var savedMetrics SavedMetrics + err = json.Unmarshal(contents, &savedMetrics) + if err != nil { + return 0, fmt.Errorf("failed to unmarshal saved metric data: %w", err) + } + p.lastOverallSyncState = savedMetrics.LastClockState + p.processStates = savedMetrics.ProcessStates + + for configName, portAndRole := range savedMetrics.PortRoles { + for port, role := range portAndRole { + ptpConf := p.GetPTPConfig(configName) + if ptpConf == nil { + continue + } + ptpInterface, ifErr := ptpConf.ByInterface(port) + if ifErr != nil { + continue + } + ptpInterface.UpdateRole(role) + } + } + return savedMetrics.Timestamp, nil +} + +func (p *PTPEventManager) saveToStore() error { + // Skip saving if scConfig is not available (e.g., in test scenarios) + if p.scConfig == nil { + return nil + } + + PortRoles := make(map[types.ConfigName]map[string]types.PtpPortRole) + + for configName, ptp4lConf := range p.Ptp4lConfigInterfaces { + for _, ptpInterface := range ptp4lConf.Interfaces { + if _, ok := PortRoles[configName]; !ok { + PortRoles[configName] = make(map[string]types.PtpPortRole) + } + PortRoles[configName][ptpInterface.Name] = ptpInterface.Role + } + } + + data := SavedMetrics{ + Timestamp: time.Now().Unix(), // TODO track last change to values. + LastClockState: p.lastOverallSyncState, + PortRoles: PortRoles, + } + + content, err := json.Marshal(data) + if err != nil { + return fmt.Errorf("failed to marshall save data fron current data: %w", err) + } + err = Filesystem.WriteFile(filepath.Join(p.scConfig.StorePath, metricFilename), content, 0600) + if err != nil { + return fmt.Errorf("failed to write save metrics file: %w", err) + } + return nil +} + +func (p *PTPEventManager) SetInitalMetrics() { + for cfgName, ptp4lConf := range p.Ptp4lConfigInterfaces { + for _, ptpInterface := range ptp4lConf.Interfaces { + updateRoleMetricFromData(string(cfgName), ptpInterface.Name, ptpInterface.Role) + } + } + for cfgName, procs := range p.processStates { + for processName, status := range procs { + var val int64 = 0 + if status { + val = 1 + } + UpdateProcessStatusMetrics(processName, string(cfgName), val) + } + } +} + func (p *PTPEventManager) TriggerLogs() error { - resp, err := http.Get(logsEndpoint) //nolint:gosec + client := &http.Client{Timeout: 5 * time.Second} + resp, err := client.Get(logsEndpoint) //nolint:gosec if err != nil { return fmt.Errorf("failed to trigger logs: %w", err) } defer resp.Body.Close() return nil } + +func updateRoleMetricFromData(configName, portName string, portRole types.PtpPortRole) { + configNameParts := strings.Split(configName, ".") + process := ptp4lProcessName + if len(configNameParts) > 0 { + process = configNameParts[0] + } + UpdateInterfaceRoleMetrics(process, portName, portRole) +} diff --git a/plugins/ptp_operator/metrics/manager_test.go b/plugins/ptp_operator/metrics/manager_test.go index 46529a38..4f2ef7ba 100644 --- a/plugins/ptp_operator/metrics/manager_test.go +++ b/plugins/ptp_operator/metrics/manager_test.go @@ -6,14 +6,16 @@ package metrics_test import ( "testing" - "github.com/redhat-cne/cloud-event-proxy/plugins/ptp_operator/types" - "sync" + "github.com/redhat-cne/cloud-event-proxy/pkg/common" ptpConfig "github.com/redhat-cne/cloud-event-proxy/plugins/ptp_operator/config" "github.com/redhat-cne/cloud-event-proxy/plugins/ptp_operator/metrics" "github.com/redhat-cne/cloud-event-proxy/plugins/ptp_operator/stats" + "github.com/redhat-cne/cloud-event-proxy/plugins/ptp_operator/types" + "github.com/redhat-cne/sdk-go/pkg/channel" "github.com/redhat-cne/sdk-go/pkg/event/ptp" + v1pubsub "github.com/redhat-cne/sdk-go/v1/pubsub" "github.com/stretchr/testify/assert" ) @@ -407,3 +409,63 @@ func TestOverallState(t *testing.T) { }) } } + +func TestPublishEvent_UsesAggregatedNodeStateAndDedupes(t *testing.T) { + // Arrange + metrics.Filesystem = &metrics.MockFileSystem{} + pubTypes := map[ptp.EventType]*types.EventPublisherType{ + ptp.PtpStateChange: { + EventType: ptp.PtpStateChange, + Resource: ptp.PtpLockState, + PubID: "pub1", + }, + ptp.OsClockSyncStateChange: { + EventType: ptp.OsClockSyncStateChange, + Resource: ptp.OsClockSyncState, + PubID: "pub2", + }, + ptp.SyncStateChange: { + EventType: ptp.SyncStateChange, + Resource: ptp.SyncStatusState, + PubID: "pub3", + }, + } + sc := &common.SCConfiguration{ + EventOutCh: make(chan *channel.DataChan, 10), + PubSubAPI: v1pubsub.GetAPIInstance("/tmp/store"), + SubscriberAPI: nil, + StorePath: "/tmp/store", + } + mgr := metrics.NewPTPEventManager("/cluster/node", pubTypes, "testnode", sc) + + // Build stats across configs to force aggregated state decisions + cfg1 := types.ConfigName("ptp4l.0.config") + cfg2 := types.ConfigName("phc2sys.0.config") + s1 := stats.NewStats(string(cfg1)) + s2 := stats.NewStats(string(cfg2)) + + // Scenario A: Master LOCKED, OS FREERUN -> aggregated should be FREERUN + s1.SetLastSyncState(ptp.LOCKED) + s2.SetLastSyncState(ptp.FREERUN) + mgr.SetStats(cfg1, stats.PTPStats{metrics.MasterClockType: s1}) + mgr.SetStats(cfg2, stats.PTPStats{metrics.ClockRealTime: s2}) + + mockFS := metrics.Filesystem.(*metrics.MockFileSystem) + mockFS.Clear() + + // Act: Publish master state change; should compute node FREERUN and persist once + mgr.MockTest(true) + mgr.PublishEvent(ptp.LOCKED, 0, "ens1f0/master", ptp.PtpStateChange) + assert.Equal(t, 1, mockFS.WriteCount, "expected a single persist when node state changes to FREERUN") + + // Act again with same overall state; should not persist again (dedup) + mgr.PublishEvent(ptp.LOCKED, 0, "ens1f0/master", ptp.PtpStateChange) + assert.Equal(t, 1, mockFS.WriteCount, "no additional persist expected when node state unchanged") + + // Scenario B: Now both LOCKED -> aggregated should move to LOCKED; expect another persist + s2.SetLastSyncState(ptp.LOCKED) + mgr.SetStats(cfg2, stats.PTPStats{metrics.ClockRealTime: s2}) + mockFS.WriteCount = 1 // Reset to account for the SetStats write, keeping the previous PublishEvent write + mgr.PublishEvent(ptp.LOCKED, 0, metrics.ClockRealTime, ptp.OsClockSyncStateChange) + assert.Equal(t, 2, mockFS.WriteCount, "expected persist when node state changes to LOCKED") +} diff --git a/plugins/ptp_operator/ptp_operator_plugin.go b/plugins/ptp_operator/ptp_operator_plugin.go index 2efcd88b..22667d42 100644 --- a/plugins/ptp_operator/ptp_operator_plugin.go +++ b/plugins/ptp_operator/ptp_operator_plugin.go @@ -651,12 +651,16 @@ func listenToSocket(wg *sync.WaitGroup) { } func processMessages(c net.Conn) { - // A new socket connection means the daemon (re)connected. - // Request a full state re-emit so metrics are populated after restart. + // Request a full state re-emit in a separate goroutine so the scanner + // can start reading immediately. TriggerLogs writes emit data back through + // this same socket connection; if we block here waiting for the HTTP response, + // nobody reads the socket, the kernel buffer fills, and the emit handler blocks. if eventManager != nil { - if err := eventManager.TriggerLogs(); err != nil { - log.Warnf("failed to trigger logs on new connection: %v", err) - } + go func() { + if err := eventManager.TriggerLogs(); err != nil { + log.Warnf("failed to trigger logs on new connection: %v", err) + } + }() } scanner := bufio.NewScanner(c) for {