Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ jobs:

build:
runs-on: ubuntu-latest
timeout-minutes: 5
steps:
- uses: actions/checkout@v2

Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ OPTIONS:
} else {
outputHandler = log.StreamHandler(os.Stdout, log.TerminalFormat())
}
logLvl, err_level := log.LvlFromString(*logLevel)
if err_level != nil {
logLvl, errLevel := log.LvlFromString(*logLevel)
if errLevel != nil {
fmt.Fprintf(os.Stderr, "Error: Unknown loglevel\n\n")
flag.Usage()
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion plugins/lines/lines.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import "github.com/ts2/ts2-sim-server/simulation"
type StandardManager struct{}

// IsFailed returns whether the track circuit of the given line item is failed or not
func (sm StandardManager) IsFailed(p *simulation.LineItem) bool {
func (sm StandardManager) IsFailed(_ *simulation.LineItem) bool {
return false
}

Expand Down
6 changes: 0 additions & 6 deletions plugins/points/points.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,16 @@ package points

import (
"github.com/ts2/ts2-sim-server/simulation"
"sync"
)

// StandardManager is a points manager that performs points change
// immediately and never fails.
type StandardManager struct {
sync.RWMutex
directions map[string]simulation.PointDirection
}

// Direction returns the direction of the points
func (sm *StandardManager) Direction(p *simulation.PointsItem) simulation.PointDirection {
sm.RLock()
defer sm.RUnlock()
return sm.directions[p.ID()]
}

Expand All @@ -45,8 +41,6 @@ func (sm *StandardManager) SetDirection(p *simulation.PointsItem, dir simulation
if dir == simulation.DirectionCurrent {
return
}
sm.Lock()
defer sm.Unlock()
sm.directions[p.ID()] = dir
if p.PairedItem() != nil {
sm.directions[p.PairedItem().ID()] = dir
Expand Down
2 changes: 1 addition & 1 deletion plugins/routes/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (sm StandardManager) CanActivate(r *simulation.Route) error {

// CanDeactivate returns an error if the given route cannot be deactivated.
// In this implementation, it always returns true.
func (sm StandardManager) CanDeactivate(r *simulation.Route) error {
func (sm StandardManager) CanDeactivate(_ *simulation.Route) error {
return nil
}

Expand Down
35 changes: 30 additions & 5 deletions server/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"encoding/json"
"fmt"
"net"
"sync"

"github.com/gorilla/websocket"
)
Expand All @@ -37,12 +38,13 @@ type ManagerType string

// connection is a wrapper around the websocket.Conn
type connection struct {
websocket.Conn
*websocket.Conn
sync.RWMutex
// pushChan is the channel on which pushed messaged are sent
pushChan chan interface{}
clientType ClientType
ManagerType ManagerType
Requests []Request
managerType ManagerType
requests []Request
}

// loop starts the reading and writing loops of the connection.
Expand Down Expand Up @@ -82,7 +84,7 @@ func (conn *connection) processRead(ctx context.Context) {
continue
}
}
conn.Requests = append(conn.Requests, req)
conn.appendRequest(req)
hub.readChan <- conn
}
}
Expand Down Expand Up @@ -130,10 +132,33 @@ func (conn *connection) registerClient() (error, *Request) {
logger.Info("Error while writing", "connection", conn.RemoteAddr(), "request", "NewOkResponse", "error", err)
}
hub.registerChan <- conn
logger.Info("Registered client", "connection", conn.RemoteAddr(), "clientType", conn.clientType, "managerType", conn.ManagerType)
logger.Info("Registered client", "connection", conn.RemoteAddr(), "clientType", conn.clientType, "managerType", conn.managerType)
return nil, req
}

// popRequest pops the first request of this connection
func (conn *connection) popRequest() Request {
conn.Lock()
defer conn.Unlock()
req := conn.requests[0]
conn.requests = conn.requests[1:]
return req
}

// getRequest returns the first request of this connection
func (conn *connection) getRequest() Request {
conn.RLock()
defer conn.RUnlock()
return conn.requests[0]
}

// appendRequest appends the given request to the requests list
func (conn *connection) appendRequest(req Request) {
conn.Lock()
defer conn.Unlock()
conn.requests = append(conn.requests, req)
}

// Close terminates the websocket connection and closes associated resources
func (conn *connection) Close() error {
_ = conn.Conn.Close()
Expand Down
20 changes: 20 additions & 0 deletions server/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,31 @@ import (

"github.com/gorilla/websocket"
. "github.com/smartystreets/goconvey/convey"
"github.com/ts2/ts2-sim-server/simulation"
)

func TestConnection(t *testing.T) {
// Wait for server to come up
time.Sleep(2 * time.Second)
Convey("Testing server connection", t, func() {
stopChan := make(chan struct{})
// Creating a few concurrent connections
for i := 0; i < nbGoroutines; i++ {
go func() {
conn := clientDial(t)
defer conn.Close()
register(t, conn, Client, "", "client-secret")
addListener(t, conn, simulation.TrackItemChangedEvent)
addListener(t, conn, simulation.RouteActivatedEvent)
addListener(t, conn, simulation.RouteDeactivatedEvent)
addListener(t, conn, simulation.ClockEvent)
addListener(t, conn, simulation.TrainChangedEvent)
addListener(t, conn, simulation.MessageReceivedEvent)
addListener(t, conn, simulation.OptionsChangedEvent)
addListener(t, conn, simulation.StateChangedEvent)
<-stopChan
}()
}
c := clientDial(t)
Convey("Login test", func() {
Convey("First request that is not a register request should fail", func() {
Expand Down Expand Up @@ -70,6 +89,7 @@ func TestConnection(t *testing.T) {
})
Reset(func() {
err := c.Close()
close(stopChan)
So(err, ShouldBeNil)
})
})
Expand Down
2 changes: 1 addition & 1 deletion server/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
// Free Software Foundation, Inc.,
// 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.

// Package `server` implements the http/websocket front end for the ts2 simulator
// Package server implements the http/websocket front end for the ts2 simulator
package server
64 changes: 27 additions & 37 deletions server/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package server

import (
"fmt"
"sync"

"github.com/ts2/ts2-sim-server/simulation"
)
Expand All @@ -33,14 +32,8 @@ type Hub struct {
// Registry of client listeners
registry map[registryEntry]map[*connection]bool

// registryMutex protects the registry
registryMutex sync.RWMutex

// lastEvents holds the last event sent for each registryEntry
lastEvents map[registryEntry]*simulation.Event

// lastEventsMutex protects the lastEvents map
lastEventsMutex sync.RWMutex
lastEvents map[registryEntry]simulation.Event

// Register requests from the connection
registerChan chan *connection
Expand All @@ -64,28 +57,31 @@ func (h *Hub) run(hubUp chan bool) {

hubUp <- true
var (
e *simulation.Event
e simulation.Event
c *connection
)
for {
select {
case e = <-sim.EventChan:
logger.Debug("Received event from simulation", "submodule", "hub", "event", e.Name, "object", e.Object)
h.notifyClients(e)
case c = <-h.readChan:
logger.Debug("Reading request from client", "submodule", "hub", "data", c.Requests[0])
go h.dispatchObject(c)
logger.Debug("Reading request from client", "submodule", "hub", "data", c.getRequest())
h.dispatchObject(c)
case c = <-h.registerChan:
logger.Debug("Registering connection", "submodule", "hub", "connection", c.RemoteAddr())
h.register(c)
case c = <-h.unregisterChan:
logger.Info("Unregistering connection", "submodule", "hub", "connection", c.RemoteAddr())
h.unregister(c)
case e = <-sim.EventChan:
logger.Debug("Received event from simulation", "submodule", "hub", "event", e.Name, "object", e.Object)
if e.Name == simulation.ClockEvent {
sim.ProcessTimeStep()
}
h.notifyClients(e)
}
}
}

// register registers the given connection to this hub
// register the given connection to this hub
func (h *Hub) register(c *connection) {
switch c.clientType {
case Client:
Expand All @@ -95,8 +91,6 @@ func (h *Hub) register(c *connection) {

// addConnectionToRegistry adds this connection to the registry for eventName and id.
func (h *Hub) addConnectionToRegistry(conn *connection, eventName simulation.EventName, id string) {
h.registryMutex.Lock()
defer h.registryMutex.Unlock()
re := registryEntry{eventName: eventName, id: id}
if _, ok := h.registry[re]; !ok {
h.registry[re] = make(map[*connection]bool)
Expand All @@ -106,24 +100,20 @@ func (h *Hub) addConnectionToRegistry(conn *connection, eventName simulation.Eve

// removeEntryFromRegistry removes this connection from the registry for eventName and id.
func (h *Hub) removeEntryFromRegistry(conn *connection, eventName simulation.EventName, id string) {
h.registryMutex.Lock()
defer h.registryMutex.Unlock()
re := registryEntry{eventName: eventName, id: id}
delete(h.registry[re], conn)
}

// removeConnectionFromRegistry removes all entries of this connection in the registry.
func (h *Hub) removeConnectionFromRegistry(conn *connection) {
h.registryMutex.Lock()
defer h.registryMutex.Unlock()
for re, rv := range h.registry {
if _, ok := rv[conn]; ok {
delete(h.registry, re)
delete(h.registry[re], conn)
}
}
}

// unregister unregisters the connection to this hub
// unregister the connection to this hub
func (h *Hub) unregister(c *connection) {
switch c.clientType {
case Client:
Expand All @@ -135,39 +125,39 @@ func (h *Hub) unregister(c *connection) {
}

// notifyClients sends the given event to all registered clients.
func (h *Hub) notifyClients(e *simulation.Event) {
func (h *Hub) notifyClients(e simulation.Event) {
logger.Debug("Notifying clients", "submodule", "hub", "event", e)
h.updateLastEvents(e)
h.registryMutex.RLock()
defer h.registryMutex.RUnlock()
// Notify clients that subscribed to all objects
for conn := range h.registry[registryEntry{eventName: e.Name, id: ""}] {
conn.pushChan <- NewNotificationResponse(e)
resp := NewNotificationResponse(e)
// Send notification in another goroutine so as not to block
go func(c* connection) {
c.pushChan <- resp
}(conn)
}
if e.Object.ID() == "" {
// Object has no ID. Don't send twice
return
}
// Notify clients that subscribed to specific object IDs
for conn := range h.registry[registryEntry{eventName: e.Name, id: e.Object.ID()}] {
conn.pushChan <- NewNotificationResponse(e)
resp := NewNotificationResponse(e)
// Send notification in another goroutine so as not to block
go func(c* connection) {
c.pushChan <- resp
}(conn)
}
}

// updateLastEvents updates the lastEvents map in a concurrently safe way
func (h *Hub) updateLastEvents(e *simulation.Event) {
h.lastEventsMutex.Lock()
defer h.lastEventsMutex.Unlock()
func (h *Hub) updateLastEvents(e simulation.Event) {
h.lastEvents[registryEntry{eventName: e.Name, id: e.Object.ID()}] = e
}

// dispatchObject process a request.
//
// - req is the request to process
// - ch is the channel on which to send the response
func (h *Hub) dispatchObject(conn *connection) {
req := conn.Requests[0]
conn.Requests = conn.Requests[1:]
req := conn.popRequest()
obj, ok := h.objects[req.Object]
if !ok {
conn.pushChan <- NewErrorResponse(req.ID, fmt.Errorf("unknown object %s", req.Object))
Expand All @@ -184,7 +174,7 @@ func newHub() *Hub {
h.clientConnections = make(map[*connection]bool)
// make registry map
h.registry = make(map[registryEntry]map[*connection]bool)
h.lastEvents = make(map[registryEntry]*simulation.Event)
h.lastEvents = make(map[registryEntry]simulation.Event)
// make channels
h.registerChan = make(chan *connection)
h.unregisterChan = make(chan *connection)
Expand Down
2 changes: 1 addition & 1 deletion server/hub_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
type optionObject struct{}

// dispatch processes requests made on the Option object
func (s *optionObject) dispatch(h *Hub, req Request, conn *connection) {
func (s *optionObject) dispatch(_ *Hub, req Request, conn *connection) {
ch := conn.pushChan
switch req.Action {
case "list":
Expand Down
2 changes: 1 addition & 1 deletion server/hub_place.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
type placeObject struct{}

// dispatch processes requests made on the Place object
func (s *placeObject) dispatch(h *Hub, req Request, conn *connection) {
func (s *placeObject) dispatch(_ *Hub, req Request, conn *connection) {
ch := conn.pushChan
switch req.Action {
case "list":
Expand Down
2 changes: 1 addition & 1 deletion server/hub_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
type routeObject struct{}

// dispatch processes requests made on the route object
func (r *routeObject) dispatch(h *Hub, req Request, conn *connection) {
func (r *routeObject) dispatch(_ *Hub, req Request, conn *connection) {
ch := conn.pushChan
switch req.Action {
case "list":
Expand Down
4 changes: 1 addition & 3 deletions server/hub_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,7 @@ func (h *Hub) removeRegistryEntry(req Request, conn *connection) error {
}

// renotifyClient will resend the last notification for each event and object ID
func (h *Hub) renotifyClient(req Request, conn *connection) error {
h.lastEventsMutex.RLock()
defer h.lastEventsMutex.RUnlock()
func (h *Hub) renotifyClient(_ Request, conn *connection) error {
for re, event := range h.lastEvents {
if _, ok := h.registry[registryEntry{eventName: re.eventName, id: ""}]; ok {
if h.registry[registryEntry{eventName: event.Name, id: ""}][conn] {
Expand Down
2 changes: 1 addition & 1 deletion server/hub_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
type serviceObject struct{}

// dispatch processes requests made on the Service object
func (s *serviceObject) dispatch(h *Hub, req Request, conn *connection) {
func (s *serviceObject) dispatch(_ *Hub, req Request, conn *connection) {
ch := conn.pushChan
switch req.Action {
case "list":
Expand Down
Loading