From 528c3fddf2a36c1b06cf1f8b4ab99c2be3bf1791 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Fri, 5 Dec 2025 19:46:43 +0100 Subject: [PATCH 01/16] Conn.ListenWebsocket(): use a callback to process incoming messages Instead of shoving incoming messages into a channel, pass them to a callback of type `Subscriber`. Have `App` deal with the channel on its side (for now). --- app.go | 13 ++++++++++++- internal/websocket/reader.go | 12 +++++------- internal/websocket/subscriptions.go | 5 +++++ 3 files changed, 22 insertions(+), 8 deletions(-) create mode 100644 internal/websocket/subscriptions.go diff --git a/app.go b/app.go index dbd512f..f997bab 100644 --- a/app.go +++ b/app.go @@ -344,7 +344,18 @@ func (app *App) Start() { // entity listeners and event listeners elChan := make(chan websocket.ChanMsg) - go app.conn.ListenWebsocket(elChan) + go func() { + err := app.conn.ListenWebsocket( + func(msg websocket.ChanMsg) { + elChan <- msg + }, + ) + if err != nil { + close(elChan) + slog.Error("Error reading from websocket", "err", err) + return + } + }() for { msg, ok := <-elChan diff --git a/internal/websocket/reader.go b/internal/websocket/reader.go index 1dfc3ca..d1efaaa 100644 --- a/internal/websocket/reader.go +++ b/internal/websocket/reader.go @@ -19,15 +19,13 @@ type ChanMsg struct { } // ListenWebsocket reads JSON-formatted messages from `conn`, partly -// deserializes them, and sends them to `c`. If there is an error, -// close `c` and return. -func (conn *Conn) ListenWebsocket(c chan<- ChanMsg) { +// deserializes them, and passes them to `subscriber`. If there is an +// error, return the error. +func (conn *Conn) ListenWebsocket(subscriber Subscriber) error { for { bytes, err := conn.readMessage() if err != nil { - slog.Error("Error reading from websocket", "err", err) - close(c) - return + return err } base := BaseMessage{ @@ -45,6 +43,6 @@ func (conn *Conn) ListenWebsocket(c chan<- ChanMsg) { Raw: bytes, } - c <- chanMsg + subscriber(chanMsg) } } diff --git a/internal/websocket/subscriptions.go b/internal/websocket/subscriptions.go new file mode 100644 index 0000000..2ef08ae --- /dev/null +++ b/internal/websocket/subscriptions.go @@ -0,0 +1,5 @@ +package websocket + +// Subscriber is called synchronously when a message is received that +// matches its subscription. +type Subscriber func(msg ChanMsg) From f7af49b11c998645ef853661808fac695207de73 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 6 Dec 2025 00:04:34 +0100 Subject: [PATCH 02/16] LockedConn: add a mechanism for subscribing to particular message IDs It will be easier to route messages to the correct subscriber, and prevent subscribers from interfering with each other, if subscribing at the `Conn` level is done by message ID. Add a mechanism for doing so. For now, messages are also passed to a `defaultSubscriber`. That will soon go away. --- app.go | 2 +- internal/websocket/locked_conn.go | 45 +++++++++++++++++++++++++++++ internal/websocket/reader.go | 21 +++++++++++--- internal/websocket/subscriptions.go | 24 ++++++++++++++- internal/websocket/websocket.go | 20 ++++++------- 5 files changed, 95 insertions(+), 17 deletions(-) diff --git a/app.go b/app.go index f997bab..f9849f8 100644 --- a/app.go +++ b/app.go @@ -362,7 +362,7 @@ func (app *App) Start() { if !ok { break } - if app.entitySubscription.ID() == msg.Id { + if msg.Id == app.entitySubscription.MessageID() { go callEntityListeners(app, msg.Raw) } else { go callEventListeners(app, msg) diff --git a/internal/websocket/locked_conn.go b/internal/websocket/locked_conn.go index 8557b71..66da9fe 100644 --- a/internal/websocket/locked_conn.go +++ b/internal/websocket/locked_conn.go @@ -11,6 +11,26 @@ type LockedConn interface { // `LockedConn` is still active. NextMessageID() int64 + // Subscribe allocates a new message ID and subscribes + // `subscriber` to it, in the sense that the subscriber will be + // called for any incoming messages that have that ID. This + // doesn't actually interact with the server. Typically the next + // step would be to send a message with its message ID set to + // `Subscription.ID()`. + // + // The returned `Subscription` must eventually be passed at least + // once to `Unsubscribe()`, though `Unsubscribe()` can be called + // against a different `LockedConn` than the one that generated + // it. + Subscribe(subscriber Subscriber) Subscription + + // Unsubscribe terminates `subscription` at the websocket level; + // i.e., no more incoming messages will be forwarded to the + // corresponding `Subscriber`. Note that this does not interact + // with the server; it is the caller's responsibility to send it + // an "unsubscribe" command if necessary. + Unsubscribe(subscription Subscription) + // SendMessage sends the specified message over the websocket // connection. `msg` must be JSON-serializable and have the // correct format and a unique, monotonically-increasing ID, which @@ -30,6 +50,31 @@ func (lc lockedConn) NextMessageID() int64 { return lc.conn.lastMessageID } +// Subscribe implements [LockedConn.Subscribe]. +func (lc lockedConn) Subscribe(subscriber Subscriber) Subscription { + lc.conn.subscribersLock.Lock() + defer lc.conn.subscribersLock.Unlock() + + id := lc.NextMessageID() + lc.conn.subscribers[id] = subscriber + return Subscription{ + messageID: id, + } +} + +// Unsubscribe implements [LockedConn.Unsubscribe]. +func (lc lockedConn) Unsubscribe(subscription Subscription) { + if subscription.messageID == 0 { + return + } + + lc.conn.subscribersLock.Lock() + defer lc.conn.subscribersLock.Unlock() + + delete(lc.conn.subscribers, subscription.messageID) + subscription.messageID = 0 +} + // SendMessage implements [LockedConn.SendMessage]. func (lc lockedConn) SendMessage(msg any) error { if err := lc.conn.conn.WriteJSON(msg); err != nil { diff --git a/internal/websocket/reader.go b/internal/websocket/reader.go index d1efaaa..34a7011 100644 --- a/internal/websocket/reader.go +++ b/internal/websocket/reader.go @@ -19,9 +19,15 @@ type ChanMsg struct { } // ListenWebsocket reads JSON-formatted messages from `conn`, partly -// deserializes them, and passes them to `subscriber`. If there is an -// error, return the error. -func (conn *Conn) ListenWebsocket(subscriber Subscriber) error { +// deserializes them, and passes them to `defaultSubscriber`, as well +// as the subscriber that has subscribed to that message ID (if any). +// If there is an error, return the error and stop listening. +// +// Note that the subscribers are invoked synchronously, in the same +// order as the messages arrived, and only one is run at a time. If +// the subscriber wants processing to happen in the background, it +// must spawn a goroutine itself. +func (conn *Conn) ListenWebsocket(defaultSubscriber Subscriber) error { for { bytes, err := conn.readMessage() if err != nil { @@ -43,6 +49,13 @@ func (conn *Conn) ListenWebsocket(subscriber Subscriber) error { Raw: bytes, } - subscriber(chanMsg) + // Call the default subscriber in any case: + defaultSubscriber(chanMsg) + + // If a subscriber has been registered for this message ID, + // then call it, too: + if subr, ok := conn.getSubscriber(base.Id); ok { + subr(chanMsg) + } } } diff --git a/internal/websocket/subscriptions.go b/internal/websocket/subscriptions.go index 2ef08ae..854c371 100644 --- a/internal/websocket/subscriptions.go +++ b/internal/websocket/subscriptions.go @@ -1,5 +1,27 @@ package websocket +// Subscription represents a websocket-level subscription to a +// particular message ID. +type Subscription struct { + messageID int64 +} + +// MessageID returns the message ID that this subscription is +// subscribed to. +func (sub Subscription) MessageID() int64 { + return sub.messageID +} + // Subscriber is called synchronously when a message is received that -// matches its subscription. +// matches its subscription's message ID. type Subscriber func(msg ChanMsg) + +// getSubscriber returns the subscriber, if any, that is subscribed to +// the specified message ID. +func (conn *Conn) getSubscriber(messageID int64) (Subscriber, bool) { + conn.subscribersLock.RLock() + defer conn.subscribersLock.RUnlock() + + subscriber, ok := conn.subscribers[messageID] + return subscriber, ok +} diff --git a/internal/websocket/websocket.go b/internal/websocket/websocket.go index b91445f..404061a 100644 --- a/internal/websocket/websocket.go +++ b/internal/websocket/websocket.go @@ -27,6 +27,13 @@ type Conn struct { conn *websocket.Conn writeLock sync.Mutex lastMessageID int64 + + // subscribersLock guards access to `subscribers`. + subscribersLock sync.RWMutex + + // subscribers is a map from message ID to the subscriber that is + // subscribed to messages with that ID. + subscribers map[int64]Subscriber } func (conn *Conn) readMessage() ([]byte, error) { @@ -59,7 +66,8 @@ func NewConn( } conn := Conn{ - conn: gConn, + conn: gConn, + subscribers: make(map[int64]Subscriber), } // Read auth_required message @@ -127,16 +135,6 @@ type SubEvent struct { EventType string `json:"event_type"` } -// Subscription represents a websocket-level subscription to a -// particular message ID. -type Subscription struct { - id int64 -} - -func (sub Subscription) ID() int64 { - return sub.id -} - func SubscribeToStateChangedEvents(conn *Conn) Subscription { return SubscribeToEventType("state_changed", conn) } From 2896c7f27f9ef6edc3674fdf12e664c21a522eb1 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 6 Dec 2025 09:11:32 +0100 Subject: [PATCH 03/16] Make the "subscribe" functions methods of `Conn` --- app.go | 4 +-- internal/websocket/subscriptions.go | 44 +++++++++++++++++++++++++++++ internal/websocket/websocket.go | 40 -------------------------- 3 files changed, 46 insertions(+), 42 deletions(-) diff --git a/app.go b/app.go index f9849f8..2ec0d3f 100644 --- a/app.go +++ b/app.go @@ -258,7 +258,7 @@ func (app *App) RegisterEventListeners(evls ...EventListener) { if elList, ok := app.eventListeners[eventType]; ok { app.eventListeners[eventType] = append(elList, &evl) } else { - websocket.SubscribeToEventType(eventType, app.conn) + app.conn.SubscribeToEventType(eventType) app.eventListeners[eventType] = []*EventListener{&evl} } } @@ -316,7 +316,7 @@ func (app *App) Start() { go app.runScheduledActions(app.ctx) // subscribe to state_changed events - app.entitySubscription = websocket.SubscribeToStateChangedEvents(app.conn) + app.entitySubscription = app.conn.SubscribeToStateChangedEvents() // entity listeners runOnStartup for eid, etls := range app.entityListeners { diff --git a/internal/websocket/subscriptions.go b/internal/websocket/subscriptions.go index 854c371..b73d58b 100644 --- a/internal/websocket/subscriptions.go +++ b/internal/websocket/subscriptions.go @@ -1,5 +1,10 @@ package websocket +import ( + "fmt" + "log/slog" +) + // Subscription represents a websocket-level subscription to a // particular message ID. type Subscription struct { @@ -25,3 +30,42 @@ func (conn *Conn) getSubscriber(messageID int64) (Subscriber, bool) { subscriber, ok := conn.subscribers[messageID] return subscriber, ok } + +type SubEvent struct { + Id int64 `json:"id"` + Type string `json:"type"` + EventType string `json:"event_type"` +} + +func (conn *Conn) SubscribeToEventType(eventType string) Subscription { + var id int64 + err := conn.Send( + func(lc LockedConn) error { + id = lc.NextMessageID() + e := SubEvent{ + Id: id, + Type: "subscribe_events", + EventType: eventType, + } + + if err := lc.SendMessage(e); err != nil { + return fmt.Errorf("error writing to websocket: %w", err) + } + // m, _ := ReadMessage(ctx, conn) + // log.Default().Println(string(m)) + + return nil + }, + ) + + if err != nil { + slog.Error(err.Error()) + panic(err) + } + + return Subscription{id} +} + +func (conn *Conn) SubscribeToStateChangedEvents() Subscription { + return conn.SubscribeToEventType("state_changed") +} diff --git a/internal/websocket/websocket.go b/internal/websocket/websocket.go index 404061a..25aa117 100644 --- a/internal/websocket/websocket.go +++ b/internal/websocket/websocket.go @@ -8,7 +8,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "log/slog" "net/url" "sync" @@ -128,42 +127,3 @@ func (conn *Conn) verifyAuthResponse(ctx context.Context) error { return nil } - -type SubEvent struct { - Id int64 `json:"id"` - Type string `json:"type"` - EventType string `json:"event_type"` -} - -func SubscribeToStateChangedEvents(conn *Conn) Subscription { - return SubscribeToEventType("state_changed", conn) -} - -func SubscribeToEventType(eventType string, conn *Conn) Subscription { - var id int64 - err := conn.Send( - func(lc LockedConn) error { - id = lc.NextMessageID() - e := SubEvent{ - Id: id, - Type: "subscribe_events", - EventType: eventType, - } - - if err := lc.SendMessage(e); err != nil { - return fmt.Errorf("error writing to websocket: %w", err) - } - // m, _ := ReadMessage(ctx, conn) - // log.Default().Println(string(m)) - - return nil - }, - ) - - if err != nil { - slog.Error(err.Error()) - panic(err) - } - - return Subscription{id} -} From eb965303bbfb11abb0a0873dcc691e8f8105552d Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 6 Dec 2025 16:40:50 +0100 Subject: [PATCH 04/16] Conn: change the "subscribe" methods to take a `Subscriber` The subscriber is invoked every time a message arrives with the message ID corresponding to the subscription. --- app.go | 4 ++-- internal/websocket/subscriptions.go | 18 +++++++++++------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/app.go b/app.go index 2ec0d3f..4223e71 100644 --- a/app.go +++ b/app.go @@ -258,7 +258,7 @@ func (app *App) RegisterEventListeners(evls ...EventListener) { if elList, ok := app.eventListeners[eventType]; ok { app.eventListeners[eventType] = append(elList, &evl) } else { - app.conn.SubscribeToEventType(eventType) + app.conn.SubscribeToEventType(eventType, websocket.NoopSubscriber) app.eventListeners[eventType] = []*EventListener{&evl} } } @@ -316,7 +316,7 @@ func (app *App) Start() { go app.runScheduledActions(app.ctx) // subscribe to state_changed events - app.entitySubscription = app.conn.SubscribeToStateChangedEvents() + app.entitySubscription = app.conn.SubscribeToStateChangedEvents(websocket.NoopSubscriber) // entity listeners runOnStartup for eid, etls := range app.entityListeners { diff --git a/internal/websocket/subscriptions.go b/internal/websocket/subscriptions.go index b73d58b..2cad7fa 100644 --- a/internal/websocket/subscriptions.go +++ b/internal/websocket/subscriptions.go @@ -21,6 +21,9 @@ func (sub Subscription) MessageID() int64 { // matches its subscription's message ID. type Subscriber func(msg ChanMsg) +// NoopSubscriber is a `Subscriber` that does nothing. +func NoopSubscriber(_ ChanMsg) {} + // getSubscriber returns the subscriber, if any, that is subscribed to // the specified message ID. func (conn *Conn) getSubscriber(messageID int64) (Subscriber, bool) { @@ -37,18 +40,19 @@ type SubEvent struct { EventType string `json:"event_type"` } -func (conn *Conn) SubscribeToEventType(eventType string) Subscription { - var id int64 +func (conn *Conn) SubscribeToEventType(eventType string, subr Subscriber) Subscription { + var subn Subscription err := conn.Send( func(lc LockedConn) error { - id = lc.NextMessageID() + subn = lc.Subscribe(subr) e := SubEvent{ - Id: id, + Id: subn.messageID, Type: "subscribe_events", EventType: eventType, } if err := lc.SendMessage(e); err != nil { + lc.Unsubscribe(subn) return fmt.Errorf("error writing to websocket: %w", err) } // m, _ := ReadMessage(ctx, conn) @@ -63,9 +67,9 @@ func (conn *Conn) SubscribeToEventType(eventType string) Subscription { panic(err) } - return Subscription{id} + return subn } -func (conn *Conn) SubscribeToStateChangedEvents() Subscription { - return conn.SubscribeToEventType("state_changed") +func (conn *Conn) SubscribeToStateChangedEvents(subr Subscriber) Subscription { + return conn.SubscribeToEventType("state_changed", subr) } From 728e056d469940dc39f62ef5268f63571c843f19 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 6 Dec 2025 15:31:09 +0100 Subject: [PATCH 05/16] callEntityListeners(), callEventListeners(): make methods of `App` --- app.go | 4 ++-- entitylistener.go | 2 +- eventListener.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/app.go b/app.go index 4223e71..f637eb7 100644 --- a/app.go +++ b/app.go @@ -363,9 +363,9 @@ func (app *App) Start() { break } if msg.Id == app.entitySubscription.MessageID() { - go callEntityListeners(app, msg.Raw) + go app.callEntityListeners(msg.Raw) } else { - go callEventListeners(app, msg) + go app.callEventListeners(msg) } } } diff --git a/entitylistener.go b/entitylistener.go index e727539..226215c 100644 --- a/entitylistener.go +++ b/entitylistener.go @@ -192,7 +192,7 @@ func (b elBuilder3) Build() EntityListener { } /* Functions */ -func callEntityListeners(app *App, msgBytes []byte) { +func (app *App) callEntityListeners(msgBytes []byte) { msg := stateChangedMsg{} _ = json.Unmarshal(msgBytes, &msg) data := msg.Event.Data diff --git a/eventListener.go b/eventListener.go index a9f9e24..19a3c89 100644 --- a/eventListener.go +++ b/eventListener.go @@ -140,7 +140,7 @@ type BaseEventMsg struct { } /* Functions */ -func callEventListeners(app *App, msg websocket.ChanMsg) { +func (app *App) callEventListeners(msg websocket.ChanMsg) { baseEventMsg := BaseEventMsg{} _ = json.Unmarshal(msg.Raw, &baseEventMsg) listeners, ok := app.eventListeners[baseEventMsg.Event.EventType] From 16173c8e353d11438de05fb37e54ba25574f3b62 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 6 Dec 2025 15:51:56 +0100 Subject: [PATCH 06/16] App.Start(): move more of the message handling into the subscriber The old code wrote incoming messages to a channel, and then (in a separate goroutine) read and processed the messages from the channel. Note that the second goroutine created yet another goroutine for processing each message. Instead, write a `Subscriber` that processes the messages directly (while continuing to create a goroutine for each message). Pass this subscriber to `Conn.ListenWebsocket()` as the default subscriber. In future commits, we will add more specific subscribers for each type of message. --- app.go | 27 +++++++-------------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/app.go b/app.go index f637eb7..cee95bd 100644 --- a/app.go +++ b/app.go @@ -342,32 +342,19 @@ func (app *App) Start() { } } - // entity listeners and event listeners - elChan := make(chan websocket.ChanMsg) - go func() { - err := app.conn.ListenWebsocket( - func(msg websocket.ChanMsg) { - elChan <- msg - }, - ) - if err != nil { - close(elChan) - slog.Error("Error reading from websocket", "err", err) - return - } - }() - - for { - msg, ok := <-elChan - if !ok { - break - } + dispatchMessage := func(msg websocket.ChanMsg) { if msg.Id == app.entitySubscription.MessageID() { go app.callEntityListeners(msg.Raw) } else { go app.callEventListeners(msg) } } + + // entity listeners and event listeners + err := app.conn.ListenWebsocket(dispatchMessage) + if err != nil { + slog.Error("Error reading from websocket", "err", err) + } } // runScheduledActions starts a goroutine to run each `DailySchedule` From 0f10fab3b16863bc042d72cbc6d604b47d4e6228 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 6 Dec 2025 16:03:24 +0100 Subject: [PATCH 07/16] App.registerEntityListener(), registierEventListener(): new methods Add helper methods to add single listeners. --- app.go | 46 ++++++++++++++++++++++++++-------------------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/app.go b/app.go index cee95bd..1016b78 100644 --- a/app.go +++ b/app.go @@ -233,35 +233,41 @@ func (app *App) RegisterIntervals(intervals ...Interval) { } } +func (app *App) registerEntityListener(etl EntityListener) { + if etl.delay != 0 && etl.toState == "" { + slog.Error("EntityListener error: you have to use ToState() when using Duration()") + panic(ErrInvalidArgs) + } + + for _, entity := range etl.entityIds { + if elList, ok := app.entityListeners[entity]; ok { + app.entityListeners[entity] = append(elList, &etl) + } else { + app.entityListeners[entity] = []*EntityListener{&etl} + } + } +} + func (app *App) RegisterEntityListeners(etls ...EntityListener) { for _, etl := range etls { - etl := etl - if etl.delay != 0 && etl.toState == "" { - slog.Error("EntityListener error: you have to use ToState() when using Duration()") - panic(ErrInvalidArgs) - } + app.registerEntityListener(etl) + } +} - for _, entity := range etl.entityIds { - if elList, ok := app.entityListeners[entity]; ok { - app.entityListeners[entity] = append(elList, &etl) - } else { - app.entityListeners[entity] = []*EntityListener{&etl} - } +func (app *App) registerEventListener(evl EventListener) { + for _, eventType := range evl.eventTypes { + if elList, ok := app.eventListeners[eventType]; ok { + app.eventListeners[eventType] = append(elList, &evl) + } else { + app.conn.SubscribeToEventType(eventType, websocket.NoopSubscriber) + app.eventListeners[eventType] = []*EventListener{&evl} } } } func (app *App) RegisterEventListeners(evls ...EventListener) { for _, evl := range evls { - evl := evl - for _, eventType := range evl.eventTypes { - if elList, ok := app.eventListeners[eventType]; ok { - app.eventListeners[eventType] = append(elList, &evl) - } else { - app.conn.SubscribeToEventType(eventType, websocket.NoopSubscriber) - app.eventListeners[eventType] = []*EventListener{&evl} - } - } + app.registerEventListener(evl) } } From 54a60b2b13f08c74e140029c38477b5fdfd09362 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 6 Dec 2025 16:05:32 +0100 Subject: [PATCH 08/16] App.registerEventListener(): simplify logic The thing that really has to be done specially for new event types is to subscribe to them. It is possible to append to a nil list, so that can be done in shared code after the `if`. --- app.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/app.go b/app.go index 1016b78..10a742a 100644 --- a/app.go +++ b/app.go @@ -256,12 +256,11 @@ func (app *App) RegisterEntityListeners(etls ...EntityListener) { func (app *App) registerEventListener(evl EventListener) { for _, eventType := range evl.eventTypes { - if elList, ok := app.eventListeners[eventType]; ok { - app.eventListeners[eventType] = append(elList, &evl) - } else { + elList, ok := app.eventListeners[eventType] + if !ok { app.conn.SubscribeToEventType(eventType, websocket.NoopSubscriber) - app.eventListeners[eventType] = []*EventListener{&evl} } + app.eventListeners[eventType] = append(elList, &evl) } } From fa1db269a9f76acae935e658475fbba1955253b8 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 6 Dec 2025 16:23:15 +0100 Subject: [PATCH 09/16] EventListener.maybeCall(): new method Extract method `EventListener.maybeCall()` from `App.callEventListeners()`. --- eventListener.go | 66 +++++++++++++++++++++++++----------------------- 1 file changed, 35 insertions(+), 31 deletions(-) diff --git a/eventListener.go b/eventListener.go index 19a3c89..84b7219 100644 --- a/eventListener.go +++ b/eventListener.go @@ -133,48 +133,52 @@ func (b eventListenerBuilder3) Build() EventListener { return b.eventListener } -type BaseEventMsg struct { - Event struct { - EventType string `json:"event_type"` - } `json:"event"` +func (l *EventListener) maybeCall(app *App, eventData EventData) { + // Check conditions + if c := checkWithinTimeRange(l.betweenStart, l.betweenEnd); c.fail { + return + } + if c := checkThrottle(l.throttle, l.lastRan); c.fail { + return + } + if c := checkExceptionDates(l.exceptionDates); c.fail { + return + } + if c := checkExceptionRanges(l.exceptionRanges); c.fail { + return + } + if c := checkEnabledEntity(app.state, l.enabledEntities); c.fail { + return + } + if c := checkDisabledEntity(app.state, l.disabledEntities); c.fail { + return + } + + go l.callback(app.service, app.state, eventData) + l.lastRan = carbon.Now() } /* Functions */ func (app *App) callEventListeners(msg websocket.ChanMsg) { - baseEventMsg := BaseEventMsg{} + var baseEventMsg struct { + Event struct { + EventType string `json:"event_type"` + } `json:"event"` + } _ = json.Unmarshal(msg.Raw, &baseEventMsg) + listeners, ok := app.eventListeners[baseEventMsg.Event.EventType] if !ok { // no listeners registered for this event type return } + eventData := EventData{ + Type: baseEventMsg.Event.EventType, + RawEventJSON: msg.Raw, + } + for _, l := range listeners { - // Check conditions - if c := checkWithinTimeRange(l.betweenStart, l.betweenEnd); c.fail { - continue - } - if c := checkThrottle(l.throttle, l.lastRan); c.fail { - continue - } - if c := checkExceptionDates(l.exceptionDates); c.fail { - continue - } - if c := checkExceptionRanges(l.exceptionRanges); c.fail { - continue - } - if c := checkEnabledEntity(app.state, l.enabledEntities); c.fail { - continue - } - if c := checkDisabledEntity(app.state, l.disabledEntities); c.fail { - continue - } - - eventData := EventData{ - Type: baseEventMsg.Event.EventType, - RawEventJSON: msg.Raw, - } - go l.callback(app.service, app.state, eventData) - l.lastRan = carbon.Now() + l.maybeCall(app, eventData) } } From 417a29c2bcfa1cfc702a8e8cb37a770d0d282238 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 6 Dec 2025 16:33:17 +0100 Subject: [PATCH 10/16] App.callEventListeners(): take the event type as an argument Pick the event type out of the message in the caller, and pass it into `App.callEventListeners()`. This will soon enable a simplification. --- app.go | 9 ++++++++- eventListener.go | 14 +++----------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/app.go b/app.go index 10a742a..af843d5 100644 --- a/app.go +++ b/app.go @@ -2,6 +2,7 @@ package gomeassistant import ( "context" + "encoding/json" "errors" "fmt" "log/slog" @@ -351,7 +352,13 @@ func (app *App) Start() { if msg.Id == app.entitySubscription.MessageID() { go app.callEntityListeners(msg.Raw) } else { - go app.callEventListeners(msg) + var baseEventMsg struct { + Event struct { + EventType string `json:"event_type"` + } `json:"event"` + } + _ = json.Unmarshal(msg.Raw, &baseEventMsg) + go app.callEventListeners(baseEventMsg.Event.EventType, msg) } } diff --git a/eventListener.go b/eventListener.go index 84b7219..2f1b92e 100644 --- a/eventListener.go +++ b/eventListener.go @@ -1,7 +1,6 @@ package gomeassistant import ( - "encoding/json" "fmt" "time" @@ -159,22 +158,15 @@ func (l *EventListener) maybeCall(app *App, eventData EventData) { } /* Functions */ -func (app *App) callEventListeners(msg websocket.ChanMsg) { - var baseEventMsg struct { - Event struct { - EventType string `json:"event_type"` - } `json:"event"` - } - _ = json.Unmarshal(msg.Raw, &baseEventMsg) - - listeners, ok := app.eventListeners[baseEventMsg.Event.EventType] +func (app *App) callEventListeners(eventType string, msg websocket.ChanMsg) { + listeners, ok := app.eventListeners[eventType] if !ok { // no listeners registered for this event type return } eventData := EventData{ - Type: baseEventMsg.Event.EventType, + Type: eventType, RawEventJSON: msg.Raw, } From bad0cd1436d180a2c8b560b68d1977656a430ebb Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sun, 7 Dec 2025 23:04:32 +0100 Subject: [PATCH 11/16] registerEntityListener(): simplify logic It is OK to append to a nil slice. --- app.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/app.go b/app.go index af843d5..d16c52f 100644 --- a/app.go +++ b/app.go @@ -241,11 +241,7 @@ func (app *App) registerEntityListener(etl EntityListener) { } for _, entity := range etl.entityIds { - if elList, ok := app.entityListeners[entity]; ok { - app.entityListeners[entity] = append(elList, &etl) - } else { - app.entityListeners[entity] = []*EntityListener{&etl} - } + app.entityListeners[entity] = append(app.entityListeners[entity], &etl) } } From 622992afc49e4b43b71d54bb5381dcdc53b7e883 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 6 Dec 2025 16:57:19 +0100 Subject: [PATCH 12/16] App.registerEventListener(): register a subscriber for each event type Use `Conn`'s subscription feature to register a listener specific to each type of event that we are watching. This allows some logic to be removed from the default subscriber, because when the even arrives, we know what event type it is for based on its message ID. --- app.go | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/app.go b/app.go index d16c52f..de90403 100644 --- a/app.go +++ b/app.go @@ -2,7 +2,6 @@ package gomeassistant import ( "context" - "encoding/json" "errors" "fmt" "log/slog" @@ -255,7 +254,17 @@ func (app *App) registerEventListener(evl EventListener) { for _, eventType := range evl.eventTypes { elList, ok := app.eventListeners[eventType] if !ok { - app.conn.SubscribeToEventType(eventType, websocket.NoopSubscriber) + // We're not listening to that event type yet. Ask HA to + // send them to us, and when they arrive, call any event + // listeners for that type (including any that are + // registered in the future). + eventType := eventType + app.conn.SubscribeToEventType( + eventType, + func(msg websocket.ChanMsg) { + go app.callEventListeners(eventType, msg) + }, + ) } app.eventListeners[eventType] = append(elList, &evl) } @@ -347,14 +356,6 @@ func (app *App) Start() { dispatchMessage := func(msg websocket.ChanMsg) { if msg.Id == app.entitySubscription.MessageID() { go app.callEntityListeners(msg.Raw) - } else { - var baseEventMsg struct { - Event struct { - EventType string `json:"event_type"` - } `json:"event"` - } - _ = json.Unmarshal(msg.Raw, &baseEventMsg) - go app.callEventListeners(baseEventMsg.Event.EventType, msg) } } From 955362cf0a67b343d69552fc01da0bb5274a9299 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 6 Dec 2025 17:09:55 +0100 Subject: [PATCH 13/16] EntityListener.maybeCall(): new method Extract method `EntityListener.maybeCall()` from `App.callEntityListeners()`. --- entitylistener.go | 120 ++++++++++++++++++++++++---------------------- 1 file changed, 63 insertions(+), 57 deletions(-) diff --git a/entitylistener.go b/entitylistener.go index 226215c..a6e45c3 100644 --- a/entitylistener.go +++ b/entitylistener.go @@ -49,16 +49,18 @@ type stateChangedMsg struct { ID int `json:"id"` Type string `json:"type"` Event struct { - Data struct { - EntityID string `json:"entity_id"` - NewState msgState `json:"new_state"` - OldState msgState `json:"old_state"` - } `json:"data"` - EventType string `json:"event_type"` - Origin string `json:"origin"` + Data stateData `json:"data"` + EventType string `json:"event_type"` + Origin string `json:"origin"` } `json:"event"` } +type stateData struct { + EntityID string `json:"entity_id"` + NewState msgState `json:"new_state"` + OldState msgState `json:"old_state"` +} + type msgState struct { EntityID string `json:"entity_id"` LastChanged time.Time `json:"last_changed"` @@ -191,6 +193,50 @@ func (b elBuilder3) Build() EntityListener { return b.entityListener } +func (l *EntityListener) maybeCall(app *App, entityData EntityData, data stateData) { + // Check conditions + if c := checkWithinTimeRange(l.betweenStart, l.betweenEnd); c.fail { + return + } + if c := checkStatesMatch(l.fromState, data.OldState.State); c.fail { + return + } + if c := checkStatesMatch(l.toState, data.NewState.State); c.fail { + if l.delayTimer != nil { + l.delayTimer.Stop() + } + return + } + if c := checkThrottle(l.throttle, l.lastRan); c.fail { + return + } + if c := checkExceptionDates(l.exceptionDates); c.fail { + return + } + if c := checkExceptionRanges(l.exceptionRanges); c.fail { + return + } + if c := checkEnabledEntity(app.state, l.enabledEntities); c.fail { + return + } + if c := checkDisabledEntity(app.state, l.disabledEntities); c.fail { + return + } + + if l.delay != 0 { + l := l + l.delayTimer = time.AfterFunc(l.delay, func() { + go l.callback(app.service, app.state, entityData) + l.lastRan = carbon.Now() + }) + return + } + + // run now if no delay set + go l.callback(app.service, app.state, entityData) + l.lastRan = carbon.Now() +} + /* Functions */ func (app *App) callEntityListeners(msgBytes []byte) { msg := stateChangedMsg{} @@ -211,56 +257,16 @@ func (app *App) callEntityListeners(msgBytes []byte) { return } - for _, l := range listeners { - // Check conditions - if c := checkWithinTimeRange(l.betweenStart, l.betweenEnd); c.fail { - continue - } - if c := checkStatesMatch(l.fromState, data.OldState.State); c.fail { - continue - } - if c := checkStatesMatch(l.toState, data.NewState.State); c.fail { - if l.delayTimer != nil { - l.delayTimer.Stop() - } - continue - } - if c := checkThrottle(l.throttle, l.lastRan); c.fail { - continue - } - if c := checkExceptionDates(l.exceptionDates); c.fail { - continue - } - if c := checkExceptionRanges(l.exceptionRanges); c.fail { - continue - } - if c := checkEnabledEntity(app.state, l.enabledEntities); c.fail { - continue - } - if c := checkDisabledEntity(app.state, l.disabledEntities); c.fail { - continue - } - - entityData := EntityData{ - TriggerEntityId: eid, - FromState: data.OldState.State, - FromAttributes: data.OldState.Attributes, - ToState: data.NewState.State, - ToAttributes: data.NewState.Attributes, - LastChanged: data.OldState.LastChanged, - } - - if l.delay != 0 { - l := l - l.delayTimer = time.AfterFunc(l.delay, func() { - go l.callback(app.service, app.state, entityData) - l.lastRan = carbon.Now() - }) - continue - } + entityData := EntityData{ + TriggerEntityId: eid, + FromState: data.OldState.State, + FromAttributes: data.OldState.Attributes, + ToState: data.NewState.State, + ToAttributes: data.NewState.Attributes, + LastChanged: data.OldState.LastChanged, + } - // run now if no delay set - go l.callback(app.service, app.state, entityData) - l.lastRan = carbon.Now() + for _, l := range listeners { + l.maybeCall(app, entityData, data) } } From c015eda6c23e081a350e3fd5a6c39d8b8644f58d Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 6 Dec 2025 17:11:02 +0100 Subject: [PATCH 14/16] App: register a subscriber for state changed events Use `Conn`'s subscription feature to register a listener specific to state changed events. This means that we can change the default subscriber to a `NoopSubscriber`. --- app.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/app.go b/app.go index de90403..618d43d 100644 --- a/app.go +++ b/app.go @@ -327,7 +327,11 @@ func (app *App) Start() { go app.runScheduledActions(app.ctx) // subscribe to state_changed events - app.entitySubscription = app.conn.SubscribeToStateChangedEvents(websocket.NoopSubscriber) + app.entitySubscription = app.conn.SubscribeToStateChangedEvents( + func(msg websocket.ChanMsg) { + go app.callEntityListeners(msg.Raw) + }, + ) // entity listeners runOnStartup for eid, etls := range app.entityListeners { @@ -353,14 +357,8 @@ func (app *App) Start() { } } - dispatchMessage := func(msg websocket.ChanMsg) { - if msg.Id == app.entitySubscription.MessageID() { - go app.callEntityListeners(msg.Raw) - } - } - // entity listeners and event listeners - err := app.conn.ListenWebsocket(dispatchMessage) + err := app.conn.ListenWebsocket(websocket.NoopSubscriber) if err != nil { slog.Error("Error reading from websocket", "err", err) } From 654e9fe2f489f99286b2d5b68a0be12da472a8b8 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 6 Dec 2025 17:13:36 +0100 Subject: [PATCH 15/16] Conn.ListenWebsocket(): git rid of the default subscriber We don't use it anymore. --- app.go | 2 +- internal/websocket/reader.go | 11 ++++------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/app.go b/app.go index 618d43d..87c1f95 100644 --- a/app.go +++ b/app.go @@ -358,7 +358,7 @@ func (app *App) Start() { } // entity listeners and event listeners - err := app.conn.ListenWebsocket(websocket.NoopSubscriber) + err := app.conn.ListenWebsocket() if err != nil { slog.Error("Error reading from websocket", "err", err) } diff --git a/internal/websocket/reader.go b/internal/websocket/reader.go index 34a7011..dbe6036 100644 --- a/internal/websocket/reader.go +++ b/internal/websocket/reader.go @@ -19,15 +19,15 @@ type ChanMsg struct { } // ListenWebsocket reads JSON-formatted messages from `conn`, partly -// deserializes them, and passes them to `defaultSubscriber`, as well -// as the subscriber that has subscribed to that message ID (if any). -// If there is an error, return the error and stop listening. +// deserializes them, and passes them to the subscriber that has +// subscribed to that message ID (if any). If there is an error, +// return the error and stop listening. // // Note that the subscribers are invoked synchronously, in the same // order as the messages arrived, and only one is run at a time. If // the subscriber wants processing to happen in the background, it // must spawn a goroutine itself. -func (conn *Conn) ListenWebsocket(defaultSubscriber Subscriber) error { +func (conn *Conn) ListenWebsocket() error { for { bytes, err := conn.readMessage() if err != nil { @@ -49,9 +49,6 @@ func (conn *Conn) ListenWebsocket(defaultSubscriber Subscriber) error { Raw: bytes, } - // Call the default subscriber in any case: - defaultSubscriber(chanMsg) - // If a subscriber has been registered for this message ID, // then call it, too: if subr, ok := conn.getSubscriber(base.Id); ok { From 7815d6bcaa89d274a4ff9e898050b4c7f205f12d Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sun, 7 Dec 2025 23:26:04 +0100 Subject: [PATCH 16/16] Conn.Run(): method renamed from `ListenWebsocket()` Now that the method handles incoming messages internally, all that its caller really has to know is that it is a method that has to be called to manage the connection. --- app.go | 5 ++--- internal/websocket/reader.go | 11 ++++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/app.go b/app.go index 87c1f95..7020862 100644 --- a/app.go +++ b/app.go @@ -357,9 +357,8 @@ func (app *App) Start() { } } - // entity listeners and event listeners - err := app.conn.ListenWebsocket() - if err != nil { + // Start listen on the connection for incoming messages: + if err := app.conn.Run(); err != nil { slog.Error("Error reading from websocket", "err", err) } } diff --git a/internal/websocket/reader.go b/internal/websocket/reader.go index dbe6036..f1cce6f 100644 --- a/internal/websocket/reader.go +++ b/internal/websocket/reader.go @@ -18,16 +18,17 @@ type ChanMsg struct { Raw []byte } -// ListenWebsocket reads JSON-formatted messages from `conn`, partly -// deserializes them, and passes them to the subscriber that has -// subscribed to that message ID (if any). If there is an error, -// return the error and stop listening. +// Run processes incoming messages from `Conn`. It reads +// JSON-formatted messages from `conn`, partly deserializes them, and +// passes them to the subscriber that has subscribed to that message +// ID (if any). If there is an error, return the error and stop +// listening. // // Note that the subscribers are invoked synchronously, in the same // order as the messages arrived, and only one is run at a time. If // the subscriber wants processing to happen in the background, it // must spawn a goroutine itself. -func (conn *Conn) ListenWebsocket() error { +func (conn *Conn) Run() error { for { bytes, err := conn.readMessage() if err != nil {