Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions plugins/ptp_operator/metrics/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,8 @@ func (p *PTPEventManager) GenPTPEvent(ptpProfileName string, oStats *stats.Stats
}

lastClockState := oStats.LastSyncState()
log.Infof("DEBUG GenPTPEvent: profile=%s resource=%s offset=%d clockState=%s lastState=%s eventType=%s",
ptpProfileName, eventResourceName, ptpOffset, clockState, lastClockState, eventType)
threshold := p.PtpThreshold(ptpProfileName, false)
switch clockState {
case ptp.LOCKED:
Expand Down Expand Up @@ -795,12 +797,15 @@ func (p *PTPEventManager) SetInitalMetrics() {
}

func (p *PTPEventManager) TriggerLogs() error {
log.Infof("DEBUG TriggerLogs: calling %s", logsEndpoint)
client := &http.Client{Timeout: 5 * time.Second}
resp, err := client.Get(logsEndpoint)
if err != nil {
log.Infof("DEBUG TriggerLogs: request failed: %v", err)
return err
}
defer resp.Body.Close()
log.Infof("DEBUG TriggerLogs: response status=%s", resp.Status)
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions plugins/ptp_operator/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func (p *PTPEventManager) ExtractMetrics(msg string) {
}
processName := fields[processNameIndex]
configName := fields[configNameIndex]
log.Debugf("DEBUG ExtractMetrics: process=%s config=%s msg=%.100s", processName, configName, msg)
// if log is having ts2phc then it will replace it with ptp4l
configName = strings.Replace(configName, ts2phcProcessName, ptp4lProcessName, 1)
ptp4lCfg := p.GetPTPConfig(types.ConfigName(configName))
Expand All @@ -120,6 +121,7 @@ func (p *PTPEventManager) ExtractMetrics(msg string) {
// Process status messages should always be processed regardless of profile configuration
if strings.Contains(output, ptpProcessStatusIdentifier) {
if status, err := p.parsePTPStatus(output, fields); err == nil {
log.Infof("DEBUG ExtractMetrics: process status detected: process=%s config=%s status=%d", processName, configName, status)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Infof in the hot parser path will flood logs and hurt throughput.

Line 124 and Line 228 log per-message parse details at info level inside ExtractMetrics, which is a high-frequency path. This can create heavy I/O and noisy operational logs under load.

Suggested fix
-			log.Infof("DEBUG ExtractMetrics: process status detected: process=%s config=%s status=%d", processName, configName, status)
+			log.Debugf("DEBUG ExtractMetrics: process status detected: process=%s config=%s status=%d", processName, configName, status)
@@
-			log.Infof("DEBUG ExtractMetrics: offset parsed: process=%s iface=%s offset=%.0f syncState=%s", processName, interfaceName, ptpOffset, syncState)
+			log.Debugf("DEBUG ExtractMetrics: offset parsed: process=%s iface=%s offset=%.0f syncState=%s", processName, interfaceName, ptpOffset, syncState)

Also applies to: 228-228

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@plugins/ptp_operator/metrics/metrics.go` at line 124, The ExtractMetrics hot
path is using log.Infof for per-message parse details which will flood logs;
change those per-message Infof calls in ExtractMetrics (the lines logging
processName, configName, status and the other per-message parse log around line
228) to a debug-level log (e.g., log.Debugf) or wrap them in a debug-enabled
guard (e.g., if log.IsDebugEnabled() { log.Debugf(...) }) so normal operation
uses no- or low-overhead logging while keeping the detailed messages available
when debug is enabled.

if status == PtpProcessDown {
p.processDownEvent(profileName, processName, ptpStats)
}
Expand Down Expand Up @@ -223,6 +225,7 @@ func (p *PTPEventManager) ExtractMetrics(msg string) {
if interfaceName == "" {
return // don't do if iface not known
}
log.Infof("DEBUG ExtractMetrics: offset parsed: process=%s iface=%s offset=%.0f syncState=%s", processName, interfaceName, ptpOffset, syncState)
// only ts2phc process will return actual interface name- allow all ts2phcprocess or iface in (master or clock realtime)
if processName != ts2phcProcessName && !(interfaceName == master || interfaceName == ClockRealTime) {
return // only master and clock_realtime are supported
Expand Down
1 change: 1 addition & 0 deletions plugins/ptp_operator/metrics/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ func UpdateSyncStateMetrics(process, iface string, state ptp.SyncState) {
log.Errorf("wrong metrics are processed, ignoring interface %s with process %s", iface, process)
return
}
log.Infof("DEBUG UpdateSyncStateMetrics: process=%s iface=%s state=%s value=%.0f", process, iface, state, clockState)
SyncState.With(prometheus.Labels{
"process": process, "node": ptpNodeName, "iface": iface}).Set(clockState)
}
Expand Down
29 changes: 23 additions & 6 deletions plugins/ptp_operator/ptp_operator_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ const (
// restartCommand is the control command sent by linuxptp-daemon on the event
// socket to request a sidecar restart. The CMD prefix distinguishes it from
// process log lines (which always start with a process name like ptp4l, phc2sys).
restartCommand = "CMD RESTART"
restartCommand = "CMD RESTART"
// liveStartCommand is sent by linuxptp-daemon on each ptp4l process connection
// after the replay gate opens, indicating that all subsequent data is live.
liveStartCommand = "CMD LIVE_START"
phc2sysProcessName = "phc2sys"
ptp4lProcessName = "ptp4l"
ts2PhcProcessName = "ts2phc"
Expand Down Expand Up @@ -515,31 +518,45 @@ func listenToSocket(wg *sync.WaitGroup) {
}

func processMessages(c net.Conn) {
log.Infof("DEBUG processMessages: new connection accepted from %v", c.RemoteAddr())
<-aliasReady // wait until alias found and this is closed
log.Info("alias found, starting to process messages")
log.Info("DEBUG processMessages: aliasReady unblocked, processing can begin")
// Request a full state re-emit in a separate goroutine so the scanner
// can start reading immediately. TriggerLogs writes emit data back through
// this same socket connection; if we block here waiting for the HTTP response,
// nobody reads the socket, the kernel buffer fills, and the emit handler blocks.
// can start reading immediately. The /emit-logs handler writes replay
// data back through the EventHandler's socket connection, which CEP
// accepts as a separate processMessages goroutine. If TriggerLogs blocks
// here, that goroutine's TriggerLogs creates a recursive write-back to
// a connection whose reader is stuck in TriggerLogs — deadlock once the
// kernel buffer fills. The daemon's liveGate independently ensures no
// live data flows until replay completes, so async is safe.
if eventManager != nil {
log.Info("DEBUG processMessages: firing async TriggerLogs")
go func() {
if err := eventManager.TriggerLogs(); err != nil {
log.Warnf("failed to trigger logs on new connection: %v", err)
} else {
log.Info("DEBUG processMessages: TriggerLogs completed successfully")
}
}()
Comment on lines +533 to 540
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Replay/live ordering is still racy due to async TriggerLogs().

At Line 534, TriggerLogs() is launched in a goroutine and the scanner starts immediately, so live lines can be consumed before replay is triggered/completed. This defeats replay-before-live ordering guarantees.

Suggested fix
-	if eventManager != nil {
-		log.Info("DEBUG processMessages: firing async TriggerLogs")
-		go func() {
-			if err := eventManager.TriggerLogs(); err != nil {
-				log.Warnf("failed to trigger logs on new connection: %v", err)
-			} else {
-				log.Info("DEBUG processMessages: TriggerLogs completed successfully")
-			}
-		}()
-	}
+	if eventManager != nil {
+		log.Info("DEBUG processMessages: triggering logs before scan")
+		if err := eventManager.TriggerLogs(); err != nil {
+			log.Warnf("failed to trigger logs on new connection: %v; closing connection", err)
+			_ = c.Close()
+			return
+		}
+		log.Info("DEBUG processMessages: TriggerLogs completed successfully")
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
log.Info("DEBUG processMessages: firing async TriggerLogs")
go func() {
if err := eventManager.TriggerLogs(); err != nil {
log.Warnf("failed to trigger logs on new connection: %v", err)
} else {
log.Info("DEBUG processMessages: TriggerLogs completed successfully")
}
}()
if eventManager != nil {
log.Info("DEBUG processMessages: triggering logs before scan")
if err := eventManager.TriggerLogs(); err != nil {
log.Warnf("failed to trigger logs on new connection: %v; closing connection", err)
_ = c.Close()
return
}
log.Info("DEBUG processMessages: TriggerLogs completed successfully")
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@plugins/ptp_operator/ptp_operator_plugin.go` around lines 533 - 540, The
current async launch of TriggerLogs in processMessages causes race between
replay and live scanning; change it so TriggerLogs is executed synchronously (or
block until it finishes) before the scanner starts: locate the processMessages
function where eventManager.TriggerLogs() is called (currently inside a go func)
and remove the goroutine, or replace it with explicit synchronization (e.g., a
channel or WaitGroup) that waits for TriggerLogs to complete and surface any
error before proceeding to start the scanner; ensure the log.Warnf/log.Info
handling remains but runs after TriggerLogs returns so replay completes before
live consumption begins.

}
scanner := bufio.NewScanner(c)
for {
ok := scanner.Scan()
if !ok {
log.Error("error reading socket input, retrying")
log.Errorf("DEBUG processMessages: scanner returned false (conn closed or error), breaking")
break
}
msg := scanner.Text()
if msg == restartCommand {
log.Info("DEBUG processMessages: received CMD RESTART")
restartProcess("restart requested by daemon via socket")
return // unreachable after exec
}
if msg == liveStartCommand {
log.Info("DEBUG processMessages: received LIVE_START marker - live data follows")
continue
}
log.Debugf("DEBUG processMessages: dispatching to ExtractMetrics: %.120s", msg)
eventManager.ExtractMetrics(msg)
}
}
Expand Down
213 changes: 213 additions & 0 deletions plugins/ptp_operator/ptp_operator_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import (
"encoding/json"
"fmt"
"log"
"net"
"net/http"
"os"
"path"
"strings"
"sync"
"testing"
"time"

v2 "github.com/cloudevents/sdk-go/v2"
"github.com/google/uuid"
Expand Down Expand Up @@ -407,6 +410,216 @@ func buildEvent(node string, source ptpEvent.EventResource, eventType ptpEvent.E
return e
}

// startMockLogsServer starts a minimal HTTP server on the logsEndpoint port
// (8081) so that eventManager.TriggerLogs() succeeds during tests.
// Returns a cleanup function that shuts down the server.
func startMockLogsServer(t *testing.T) func() {
t.Helper()
mux := http.NewServeMux()
mux.HandleFunc("/emit-logs", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
})
srv := &http.Server{Addr: ":8081", Handler: mux}
var ln net.Listener
var err error
for i := 0; i < 10; i++ {
ln, err = net.Listen("tcp", ":8081")
if err == nil {
break
}
time.Sleep(50 * time.Millisecond)
}
if err != nil {
t.Fatalf("failed to listen on :8081 for mock logs server: %v", err)
}
go func() { _ = srv.Serve(ln) }()
return func() { _ = srv.Close() }
}

// setupProcessMessages prepares the globals that processMessages depends on.
// aliasReady is returned already closed so processMessages doesn't block.
// eventManager is configured in mock-test mode.
func setupProcessMessages(t *testing.T) func() {
t.Helper()
oldAliasReady := aliasReady
oldEventManager := eventManager

aliasReady = make(chan struct{})
close(aliasReady)

scCfg := &common.SCConfiguration{}
eventManager = metrics.NewPTPEventManager(resourcePrefix, pubsubTypes, nodeName, scCfg)
eventManager.MockTest(true)

return func() {
aliasReady = oldAliasReady
eventManager = oldEventManager
}
}

func TestProcessMessages_LiveStartSkipped(t *testing.T) {
cleanup := startMockLogsServer(t)
defer cleanup()
restore := setupProcessMessages(t)
defer restore()

serverConn, clientConn := net.Pipe()
done := make(chan struct{})
go func() {
processMessages(serverConn)
close(done)
}()

_, err := fmt.Fprintf(clientConn, "%s\n", liveStartCommand)
assert.NoError(t, err)
_ = clientConn.Close()

select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatal("processMessages did not return after CMD LIVE_START + EOF")
}
}

func TestProcessMessages_LiveStartBetweenRegularMessages(t *testing.T) {
cleanup := startMockLogsServer(t)
defer cleanup()
restore := setupProcessMessages(t)
defer restore()

serverConn, clientConn := net.Pipe()
done := make(chan struct{})
go func() {
processMessages(serverConn)
close(done)
}()

msgs := []string{
"ptp4l[123.456]: [ptp4l.0.config] master offset 5 s2 freq -1000 path delay 100",
liveStartCommand,
"ptp4l[123.457]: [ptp4l.0.config] master offset 3 s2 freq -998 path delay 99",
liveStartCommand,
"phc2sys[123.458]: [ptp4l.0.config] CLOCK_REALTIME phc offset 10 s2 freq -500 delay 200",
}
for _, m := range msgs {
_, err := fmt.Fprintf(clientConn, "%s\n", m)
assert.NoError(t, err)
}
_ = clientConn.Close()

select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatal("processMessages did not return after mixed LIVE_START and regular messages")
}
}

func TestProcessMessages_TriggerLogsFailureNonBlocking(t *testing.T) {
// No mock HTTP server → TriggerLogs will fail, but since it's async
// the scanner should still process messages on this connection.
restore := setupProcessMessages(t)
defer restore()

serverConn, clientConn := net.Pipe()
done := make(chan struct{})
go func() {
processMessages(serverConn)
close(done)
}()

// Even though TriggerLogs fails (no HTTP server), processMessages must
// still read from the connection because TriggerLogs is async.
_, err := fmt.Fprintf(clientConn, "%s\n", liveStartCommand)
assert.NoError(t, err)
_ = clientConn.Close()

select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatal("processMessages should still process messages when TriggerLogs fails async")
}
}

func TestProcessMessages_MultipleLiveStartOnly(t *testing.T) {
cleanup := startMockLogsServer(t)
defer cleanup()
restore := setupProcessMessages(t)
defer restore()

serverConn, clientConn := net.Pipe()
done := make(chan struct{})
go func() {
processMessages(serverConn)
close(done)
}()

for i := 0; i < 5; i++ {
_, err := fmt.Fprintf(clientConn, "%s\n", liveStartCommand)
assert.NoError(t, err)
}
_ = clientConn.Close()

select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatal("processMessages did not return after multiple CMD LIVE_START messages")
}
}

func TestLiveStartCommand_ConstantValue(t *testing.T) {
assert.Equal(t, "CMD LIVE_START", liveStartCommand)
assert.NotEqual(t, restartCommand, liveStartCommand,
"LIVE_START and RESTART must be distinct commands")
assert.True(t, strings.HasPrefix(liveStartCommand, "CMD "),
"control commands should use CMD prefix to distinguish from log lines")
}

func TestProcessMessages_AliasReadyGating(t *testing.T) {
cleanup := startMockLogsServer(t)
defer cleanup()

oldAliasReady := aliasReady
oldEventManager := eventManager
defer func() {
aliasReady = oldAliasReady
eventManager = oldEventManager
}()

aliasReady = make(chan struct{}) // NOT closed — processMessages should block
scCfg := &common.SCConfiguration{}
eventManager = metrics.NewPTPEventManager(resourcePrefix, pubsubTypes, nodeName, scCfg)
eventManager.MockTest(true)

serverConn, clientConn := net.Pipe()
defer func() { _ = clientConn.Close() }()

started := make(chan struct{})
done := make(chan struct{})
go func() {
close(started)
processMessages(serverConn)
close(done)
}()
<-started

select {
case <-done:
t.Fatal("processMessages should block while aliasReady is open")
case <-time.After(200 * time.Millisecond):
}

close(aliasReady)

_, _ = fmt.Fprintf(clientConn, "%s\n", liveStartCommand)
_ = clientConn.Close()

select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatal("processMessages did not unblock after aliasReady closed")
}
}

func getMockOverrideFn() func(e v2.Event, d *channel.DataChan) error {
return func(e v2.Event, d *channel.DataChan) error {
if e.Source() != "" {
Expand Down
Loading