-
Notifications
You must be signed in to change notification settings - Fork 26
OCPBUGS-85092: make TriggerLogs synchronous and handle CMD LIVE_START #683
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -55,7 +55,10 @@ const ( | |||||||||||||||||||||||||||||||||||
| // restartCommand is the control command sent by linuxptp-daemon on the event | ||||||||||||||||||||||||||||||||||||
| // socket to request a sidecar restart. The CMD prefix distinguishes it from | ||||||||||||||||||||||||||||||||||||
| // process log lines (which always start with a process name like ptp4l, phc2sys). | ||||||||||||||||||||||||||||||||||||
| restartCommand = "CMD RESTART" | ||||||||||||||||||||||||||||||||||||
| restartCommand = "CMD RESTART" | ||||||||||||||||||||||||||||||||||||
| // liveStartCommand is sent by linuxptp-daemon on each ptp4l process connection | ||||||||||||||||||||||||||||||||||||
| // after the replay gate opens, indicating that all subsequent data is live. | ||||||||||||||||||||||||||||||||||||
| liveStartCommand = "CMD LIVE_START" | ||||||||||||||||||||||||||||||||||||
| phc2sysProcessName = "phc2sys" | ||||||||||||||||||||||||||||||||||||
| ptp4lProcessName = "ptp4l" | ||||||||||||||||||||||||||||||||||||
| ts2PhcProcessName = "ts2phc" | ||||||||||||||||||||||||||||||||||||
|
|
@@ -515,31 +518,45 @@ func listenToSocket(wg *sync.WaitGroup) { | |||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| func processMessages(c net.Conn) { | ||||||||||||||||||||||||||||||||||||
| log.Infof("DEBUG processMessages: new connection accepted from %v", c.RemoteAddr()) | ||||||||||||||||||||||||||||||||||||
| <-aliasReady // wait until alias found and this is closed | ||||||||||||||||||||||||||||||||||||
| log.Info("alias found, starting to process messages") | ||||||||||||||||||||||||||||||||||||
| log.Info("DEBUG processMessages: aliasReady unblocked, processing can begin") | ||||||||||||||||||||||||||||||||||||
| // Request a full state re-emit in a separate goroutine so the scanner | ||||||||||||||||||||||||||||||||||||
| // can start reading immediately. TriggerLogs writes emit data back through | ||||||||||||||||||||||||||||||||||||
| // this same socket connection; if we block here waiting for the HTTP response, | ||||||||||||||||||||||||||||||||||||
| // nobody reads the socket, the kernel buffer fills, and the emit handler blocks. | ||||||||||||||||||||||||||||||||||||
| // can start reading immediately. The /emit-logs handler writes replay | ||||||||||||||||||||||||||||||||||||
| // data back through the EventHandler's socket connection, which CEP | ||||||||||||||||||||||||||||||||||||
| // accepts as a separate processMessages goroutine. If TriggerLogs blocks | ||||||||||||||||||||||||||||||||||||
| // here, that goroutine's TriggerLogs creates a recursive write-back to | ||||||||||||||||||||||||||||||||||||
| // a connection whose reader is stuck in TriggerLogs — deadlock once the | ||||||||||||||||||||||||||||||||||||
| // kernel buffer fills. The daemon's liveGate independently ensures no | ||||||||||||||||||||||||||||||||||||
| // live data flows until replay completes, so async is safe. | ||||||||||||||||||||||||||||||||||||
| if eventManager != nil { | ||||||||||||||||||||||||||||||||||||
| log.Info("DEBUG processMessages: firing async TriggerLogs") | ||||||||||||||||||||||||||||||||||||
| go func() { | ||||||||||||||||||||||||||||||||||||
| if err := eventManager.TriggerLogs(); err != nil { | ||||||||||||||||||||||||||||||||||||
| log.Warnf("failed to trigger logs on new connection: %v", err) | ||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||
| log.Info("DEBUG processMessages: TriggerLogs completed successfully") | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| }() | ||||||||||||||||||||||||||||||||||||
|
Comment on lines
+533
to
540
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Replay/live ordering is still racy due to async At Line 534, Suggested fix- if eventManager != nil {
- log.Info("DEBUG processMessages: firing async TriggerLogs")
- go func() {
- if err := eventManager.TriggerLogs(); err != nil {
- log.Warnf("failed to trigger logs on new connection: %v", err)
- } else {
- log.Info("DEBUG processMessages: TriggerLogs completed successfully")
- }
- }()
- }
+ if eventManager != nil {
+ log.Info("DEBUG processMessages: triggering logs before scan")
+ if err := eventManager.TriggerLogs(); err != nil {
+ log.Warnf("failed to trigger logs on new connection: %v; closing connection", err)
+ _ = c.Close()
+ return
+ }
+ log.Info("DEBUG processMessages: TriggerLogs completed successfully")
+ }📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| scanner := bufio.NewScanner(c) | ||||||||||||||||||||||||||||||||||||
| for { | ||||||||||||||||||||||||||||||||||||
| ok := scanner.Scan() | ||||||||||||||||||||||||||||||||||||
| if !ok { | ||||||||||||||||||||||||||||||||||||
| log.Error("error reading socket input, retrying") | ||||||||||||||||||||||||||||||||||||
| log.Errorf("DEBUG processMessages: scanner returned false (conn closed or error), breaking") | ||||||||||||||||||||||||||||||||||||
| break | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| msg := scanner.Text() | ||||||||||||||||||||||||||||||||||||
| if msg == restartCommand { | ||||||||||||||||||||||||||||||||||||
| log.Info("DEBUG processMessages: received CMD RESTART") | ||||||||||||||||||||||||||||||||||||
| restartProcess("restart requested by daemon via socket") | ||||||||||||||||||||||||||||||||||||
| return // unreachable after exec | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| if msg == liveStartCommand { | ||||||||||||||||||||||||||||||||||||
| log.Info("DEBUG processMessages: received LIVE_START marker - live data follows") | ||||||||||||||||||||||||||||||||||||
| continue | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| log.Debugf("DEBUG processMessages: dispatching to ExtractMetrics: %.120s", msg) | ||||||||||||||||||||||||||||||||||||
| eventManager.ExtractMetrics(msg) | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Infofin the hot parser path will flood logs and hurt throughput.Line 124 and Line 228 log per-message parse details at info level inside
ExtractMetrics, which is a high-frequency path. This can create heavy I/O and noisy operational logs under load.Suggested fix
Also applies to: 228-228
🤖 Prompt for AI Agents