diff --git a/plugins/ptp_operator/metrics/manager.go b/plugins/ptp_operator/metrics/manager.go index c4f45461..0111fb12 100644 --- a/plugins/ptp_operator/metrics/manager.go +++ b/plugins/ptp_operator/metrics/manager.go @@ -441,6 +441,8 @@ func (p *PTPEventManager) GenPTPEvent(ptpProfileName string, oStats *stats.Stats } lastClockState := oStats.LastSyncState() + log.Infof("DEBUG GenPTPEvent: profile=%s resource=%s offset=%d clockState=%s lastState=%s eventType=%s", + ptpProfileName, eventResourceName, ptpOffset, clockState, lastClockState, eventType) threshold := p.PtpThreshold(ptpProfileName, false) switch clockState { case ptp.LOCKED: @@ -795,12 +797,15 @@ func (p *PTPEventManager) SetInitalMetrics() { } func (p *PTPEventManager) TriggerLogs() error { + log.Infof("DEBUG TriggerLogs: calling %s", logsEndpoint) client := &http.Client{Timeout: 5 * time.Second} resp, err := client.Get(logsEndpoint) if err != nil { + log.Infof("DEBUG TriggerLogs: request failed: %v", err) return err } defer resp.Body.Close() + log.Infof("DEBUG TriggerLogs: response status=%s", resp.Status) return nil } diff --git a/plugins/ptp_operator/metrics/metrics.go b/plugins/ptp_operator/metrics/metrics.go index 910ebe20..792240e6 100644 --- a/plugins/ptp_operator/metrics/metrics.go +++ b/plugins/ptp_operator/metrics/metrics.go @@ -107,6 +107,7 @@ func (p *PTPEventManager) ExtractMetrics(msg string) { } processName := fields[processNameIndex] configName := fields[configNameIndex] + log.Debugf("DEBUG ExtractMetrics: process=%s config=%s msg=%.100s", processName, configName, msg) // if log is having ts2phc then it will replace it with ptp4l configName = strings.Replace(configName, ts2phcProcessName, ptp4lProcessName, 1) ptp4lCfg := p.GetPTPConfig(types.ConfigName(configName)) @@ -120,6 +121,7 @@ func (p *PTPEventManager) ExtractMetrics(msg string) { // Process status messages should always be processed regardless of profile configuration if strings.Contains(output, ptpProcessStatusIdentifier) { if status, err := p.parsePTPStatus(output, fields); err == nil { + log.Infof("DEBUG ExtractMetrics: process status detected: process=%s config=%s status=%d", processName, configName, status) if status == PtpProcessDown { p.processDownEvent(profileName, processName, ptpStats) } @@ -223,6 +225,7 @@ func (p *PTPEventManager) ExtractMetrics(msg string) { if interfaceName == "" { return // don't do if iface not known } + log.Infof("DEBUG ExtractMetrics: offset parsed: process=%s iface=%s offset=%.0f syncState=%s", processName, interfaceName, ptpOffset, syncState) // only ts2phc process will return actual interface name- allow all ts2phcprocess or iface in (master or clock realtime) if processName != ts2phcProcessName && !(interfaceName == master || interfaceName == ClockRealTime) { return // only master and clock_realtime are supported diff --git a/plugins/ptp_operator/metrics/registry.go b/plugins/ptp_operator/metrics/registry.go index 305e835f..f3e535f6 100644 --- a/plugins/ptp_operator/metrics/registry.go +++ b/plugins/ptp_operator/metrics/registry.go @@ -242,6 +242,7 @@ func UpdateSyncStateMetrics(process, iface string, state ptp.SyncState) { log.Errorf("wrong metrics are processed, ignoring interface %s with process %s", iface, process) return } + log.Infof("DEBUG UpdateSyncStateMetrics: process=%s iface=%s state=%s value=%.0f", process, iface, state, clockState) SyncState.With(prometheus.Labels{ "process": process, "node": ptpNodeName, "iface": iface}).Set(clockState) } diff --git a/plugins/ptp_operator/ptp_operator_plugin.go b/plugins/ptp_operator/ptp_operator_plugin.go index 5395682a..c652eb67 100644 --- a/plugins/ptp_operator/ptp_operator_plugin.go +++ b/plugins/ptp_operator/ptp_operator_plugin.go @@ -55,7 +55,10 @@ const ( // restartCommand is the control command sent by linuxptp-daemon on the event // socket to request a sidecar restart. The CMD prefix distinguishes it from // process log lines (which always start with a process name like ptp4l, phc2sys). - restartCommand = "CMD RESTART" + restartCommand = "CMD RESTART" + // liveStartCommand is sent by linuxptp-daemon on each ptp4l process connection + // after the replay gate opens, indicating that all subsequent data is live. + liveStartCommand = "CMD LIVE_START" phc2sysProcessName = "phc2sys" ptp4lProcessName = "ptp4l" ts2PhcProcessName = "ts2phc" @@ -515,16 +518,24 @@ func listenToSocket(wg *sync.WaitGroup) { } func processMessages(c net.Conn) { + log.Infof("DEBUG processMessages: new connection accepted from %v", c.RemoteAddr()) <-aliasReady // wait until alias found and this is closed - log.Info("alias found, starting to process messages") + log.Info("DEBUG processMessages: aliasReady unblocked, processing can begin") // Request a full state re-emit in a separate goroutine so the scanner - // can start reading immediately. TriggerLogs writes emit data back through - // this same socket connection; if we block here waiting for the HTTP response, - // nobody reads the socket, the kernel buffer fills, and the emit handler blocks. + // can start reading immediately. The /emit-logs handler writes replay + // data back through the EventHandler's socket connection, which CEP + // accepts as a separate processMessages goroutine. If TriggerLogs blocks + // here, that goroutine's TriggerLogs creates a recursive write-back to + // a connection whose reader is stuck in TriggerLogs — deadlock once the + // kernel buffer fills. The daemon's liveGate independently ensures no + // live data flows until replay completes, so async is safe. if eventManager != nil { + log.Info("DEBUG processMessages: firing async TriggerLogs") go func() { if err := eventManager.TriggerLogs(); err != nil { log.Warnf("failed to trigger logs on new connection: %v", err) + } else { + log.Info("DEBUG processMessages: TriggerLogs completed successfully") } }() } @@ -532,14 +543,20 @@ func processMessages(c net.Conn) { for { ok := scanner.Scan() if !ok { - log.Error("error reading socket input, retrying") + log.Errorf("DEBUG processMessages: scanner returned false (conn closed or error), breaking") break } msg := scanner.Text() if msg == restartCommand { + log.Info("DEBUG processMessages: received CMD RESTART") restartProcess("restart requested by daemon via socket") return // unreachable after exec } + if msg == liveStartCommand { + log.Info("DEBUG processMessages: received LIVE_START marker - live data follows") + continue + } + log.Debugf("DEBUG processMessages: dispatching to ExtractMetrics: %.120s", msg) eventManager.ExtractMetrics(msg) } } diff --git a/plugins/ptp_operator/ptp_operator_plugin_test.go b/plugins/ptp_operator/ptp_operator_plugin_test.go index 0081c502..94c050e6 100644 --- a/plugins/ptp_operator/ptp_operator_plugin_test.go +++ b/plugins/ptp_operator/ptp_operator_plugin_test.go @@ -21,11 +21,14 @@ import ( "encoding/json" "fmt" "log" + "net" + "net/http" "os" "path" "strings" "sync" "testing" + "time" v2 "github.com/cloudevents/sdk-go/v2" "github.com/google/uuid" @@ -407,6 +410,216 @@ func buildEvent(node string, source ptpEvent.EventResource, eventType ptpEvent.E return e } +// startMockLogsServer starts a minimal HTTP server on the logsEndpoint port +// (8081) so that eventManager.TriggerLogs() succeeds during tests. +// Returns a cleanup function that shuts down the server. +func startMockLogsServer(t *testing.T) func() { + t.Helper() + mux := http.NewServeMux() + mux.HandleFunc("/emit-logs", func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + }) + srv := &http.Server{Addr: ":8081", Handler: mux} + var ln net.Listener + var err error + for i := 0; i < 10; i++ { + ln, err = net.Listen("tcp", ":8081") + if err == nil { + break + } + time.Sleep(50 * time.Millisecond) + } + if err != nil { + t.Fatalf("failed to listen on :8081 for mock logs server: %v", err) + } + go func() { _ = srv.Serve(ln) }() + return func() { _ = srv.Close() } +} + +// setupProcessMessages prepares the globals that processMessages depends on. +// aliasReady is returned already closed so processMessages doesn't block. +// eventManager is configured in mock-test mode. +func setupProcessMessages(t *testing.T) func() { + t.Helper() + oldAliasReady := aliasReady + oldEventManager := eventManager + + aliasReady = make(chan struct{}) + close(aliasReady) + + scCfg := &common.SCConfiguration{} + eventManager = metrics.NewPTPEventManager(resourcePrefix, pubsubTypes, nodeName, scCfg) + eventManager.MockTest(true) + + return func() { + aliasReady = oldAliasReady + eventManager = oldEventManager + } +} + +func TestProcessMessages_LiveStartSkipped(t *testing.T) { + cleanup := startMockLogsServer(t) + defer cleanup() + restore := setupProcessMessages(t) + defer restore() + + serverConn, clientConn := net.Pipe() + done := make(chan struct{}) + go func() { + processMessages(serverConn) + close(done) + }() + + _, err := fmt.Fprintf(clientConn, "%s\n", liveStartCommand) + assert.NoError(t, err) + _ = clientConn.Close() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("processMessages did not return after CMD LIVE_START + EOF") + } +} + +func TestProcessMessages_LiveStartBetweenRegularMessages(t *testing.T) { + cleanup := startMockLogsServer(t) + defer cleanup() + restore := setupProcessMessages(t) + defer restore() + + serverConn, clientConn := net.Pipe() + done := make(chan struct{}) + go func() { + processMessages(serverConn) + close(done) + }() + + msgs := []string{ + "ptp4l[123.456]: [ptp4l.0.config] master offset 5 s2 freq -1000 path delay 100", + liveStartCommand, + "ptp4l[123.457]: [ptp4l.0.config] master offset 3 s2 freq -998 path delay 99", + liveStartCommand, + "phc2sys[123.458]: [ptp4l.0.config] CLOCK_REALTIME phc offset 10 s2 freq -500 delay 200", + } + for _, m := range msgs { + _, err := fmt.Fprintf(clientConn, "%s\n", m) + assert.NoError(t, err) + } + _ = clientConn.Close() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("processMessages did not return after mixed LIVE_START and regular messages") + } +} + +func TestProcessMessages_TriggerLogsFailureNonBlocking(t *testing.T) { + // No mock HTTP server → TriggerLogs will fail, but since it's async + // the scanner should still process messages on this connection. + restore := setupProcessMessages(t) + defer restore() + + serverConn, clientConn := net.Pipe() + done := make(chan struct{}) + go func() { + processMessages(serverConn) + close(done) + }() + + // Even though TriggerLogs fails (no HTTP server), processMessages must + // still read from the connection because TriggerLogs is async. + _, err := fmt.Fprintf(clientConn, "%s\n", liveStartCommand) + assert.NoError(t, err) + _ = clientConn.Close() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("processMessages should still process messages when TriggerLogs fails async") + } +} + +func TestProcessMessages_MultipleLiveStartOnly(t *testing.T) { + cleanup := startMockLogsServer(t) + defer cleanup() + restore := setupProcessMessages(t) + defer restore() + + serverConn, clientConn := net.Pipe() + done := make(chan struct{}) + go func() { + processMessages(serverConn) + close(done) + }() + + for i := 0; i < 5; i++ { + _, err := fmt.Fprintf(clientConn, "%s\n", liveStartCommand) + assert.NoError(t, err) + } + _ = clientConn.Close() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("processMessages did not return after multiple CMD LIVE_START messages") + } +} + +func TestLiveStartCommand_ConstantValue(t *testing.T) { + assert.Equal(t, "CMD LIVE_START", liveStartCommand) + assert.NotEqual(t, restartCommand, liveStartCommand, + "LIVE_START and RESTART must be distinct commands") + assert.True(t, strings.HasPrefix(liveStartCommand, "CMD "), + "control commands should use CMD prefix to distinguish from log lines") +} + +func TestProcessMessages_AliasReadyGating(t *testing.T) { + cleanup := startMockLogsServer(t) + defer cleanup() + + oldAliasReady := aliasReady + oldEventManager := eventManager + defer func() { + aliasReady = oldAliasReady + eventManager = oldEventManager + }() + + aliasReady = make(chan struct{}) // NOT closed — processMessages should block + scCfg := &common.SCConfiguration{} + eventManager = metrics.NewPTPEventManager(resourcePrefix, pubsubTypes, nodeName, scCfg) + eventManager.MockTest(true) + + serverConn, clientConn := net.Pipe() + defer func() { _ = clientConn.Close() }() + + started := make(chan struct{}) + done := make(chan struct{}) + go func() { + close(started) + processMessages(serverConn) + close(done) + }() + <-started + + select { + case <-done: + t.Fatal("processMessages should block while aliasReady is open") + case <-time.After(200 * time.Millisecond): + } + + close(aliasReady) + + _, _ = fmt.Fprintf(clientConn, "%s\n", liveStartCommand) + _ = clientConn.Close() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("processMessages did not unblock after aliasReady closed") + } +} + func getMockOverrideFn() func(e v2.Event, d *channel.DataChan) error { return func(e v2.Event, d *channel.DataChan) error { if e.Source() != "" {