Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 66 additions & 53 deletions cmd/managementd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,66 +278,79 @@ func main() {
})
})

go func() {
for {
err := os.Remove(frameSocket)
if err != nil && !os.IsNotExist(err) {
log.Printf("Couldn't remove %v %v\n", frameSocket, err)
time.Sleep(1000)
continue
}

listener, err := net.Listen("unix", frameSocket)
if err != nil {
log.Println("Couldn't make socket", err)
return
}
log.Print("waiting for frames from tc2-agent")

listener.(*net.UnixListener).SetDeadline(time.Now().Add(5 * time.Second))
conn, err := listener.Accept()
if err != nil {
if err.(net.Error).Timeout() {
log.Printf("socket accept timed out, retrying...")
// Handle the frame listener in a separate goroutine
go handleFrameListenerLoop()

if hasActiveClients() {
listener.(*net.UnixListener).SetDeadline(time.Now().Add(30 * time.Second))
// If there are users connected via web sockets, force the frames to get served.
log.Println("Websocket has clients, forcing frame priority")
tc2AgentDbus, err := GetTC2AgentDbus()
if err != nil {
log.Println(err)
return
}
var result string
err = tc2AgentDbus.Call("org.cacophony.TC2Agent.prioritiseframeserve", 0).Store(&result)
if err != nil {
log.Println(err)
return
}
}
listenAddr := fmt.Sprintf(":%d", config.Port)
log.Printf("listening on %s", listenAddr)
log.Fatal(http.ListenAndServe(listenAddr, router))
}

continue
}
log.Printf("socket accept failed: %v", err)
continue
}
func handleFrameListenerLoop() {
for {
err := handleFrameListener()
if err != nil {
log.Errorf("Error handling frame listener: %v", err)
}
log.Info("Will retry connection in 5 seconds")
time.Sleep(5 * time.Second)
}
}

// Prevent concurrent connections.
listener.Close()
func handleFrameListener() error {
for {
// Remove old frame socket
err := os.Remove(frameSocket)
if err != nil && !os.IsNotExist(err) {
log.Errorf("Couldn't remove frame socket '%s'", frameSocket)
return err
}

log.Printf("accepted connection from client")
err = handleConn(conn)
frameCh <- &FrameData{Disconnected: true}
log.Printf("camera connection ended with: %v", err)
connected.Store(false)
// Make new socket
listener, err := net.Listen("unix", frameSocket)
if err != nil {
log.Errorf("Couldn't make socket '%s'", frameSocket)
return err
}

// Prioritise frames if there are active clients
log.Info("Waiting for frames from tc2-agent")
deadLineDuration := 5 * time.Second
if hasActiveClients() {
deadLineDuration = 30 * time.Second
tc2AgentDbus, err := GetTC2AgentDbus()
if err != nil {
log.Error("Failed to get TC2AgentDbus")
return err
}
var result string
err = tc2AgentDbus.Call("org.cacophony.TC2Agent.prioritiseframeserve", 0).Store(&result)
if err != nil {
log.Errorf("Failed to request frame serve priority from rp2040, %s", result)
return err
}
}
}()

listenAddr := fmt.Sprintf(":%d", config.Port)
log.Printf("listening on %s", listenAddr)
log.Fatal(http.ListenAndServe(listenAddr, router))
// Wait for and accept connection
listener.(*net.UnixListener).SetDeadline(time.Now().Add(deadLineDuration))
conn, err := listener.Accept()
if err != nil && err.(net.Error).Timeout() {
log.Info("Socket accept timed out, retrying...")
continue
} else if err != nil {
log.Errorf("Socket accept failed: %v", err)
return err
}
// Prevent concurrent connections.
listener.Close()

// Handle connection
log.Info("Accepted connection from client")
err = handleConn(conn)
frameCh <- &FrameData{Disconnected: true}
log.Infof("Camera connection ended with: %v", err)
connected.Store(false)
}
}

func handleConn(conn net.Conn) error {
Expand Down
Loading