-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi.go
More file actions
194 lines (164 loc) · 4.91 KB
/
api.go
File metadata and controls
194 lines (164 loc) · 4.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
"github.com/sirupsen/logrus"
)
// MatterbridgeMessage represents a message from/to Matterbridge
type MatterbridgeMessage struct {
Text string `json:"text"`
Username string `json:"username"`
Gateway string `json:"gateway"`
Event string `json:"event,omitempty"`
}
// MessageHandler is a callback function type for handling messages from Matterbridge
type MessageHandler func(msg MatterbridgeMessage) error
// MatterbridgeClient handles all communication with Matterbridge
type MatterbridgeClient struct {
url string
apiToken string
logger *logrus.Logger
httpClient *http.Client
streamHTTPClient *http.Client
stopChan chan struct{}
wg sync.WaitGroup
}
// NewMatterbridgeClient creates a new Matterbridge client
func NewMatterbridgeClient(url string, apiToken string, logger *logrus.Logger) *MatterbridgeClient {
return &MatterbridgeClient{
url: url,
apiToken: apiToken,
logger: logger,
httpClient: &http.Client{Timeout: 30 * time.Second},
streamHTTPClient: &http.Client{}, // No timeout for streaming
stopChan: make(chan struct{}),
}
}
// Start begins the Matterbridge client operations
func (c *MatterbridgeClient) Start(messageHandler MessageHandler) {
c.wg.Add(1)
go c.streamMessages(messageHandler)
}
// Stop stops the Matterbridge client
func (c *MatterbridgeClient) Stop() {
c.logger.Debug("Stopping Matterbridge client...")
close(c.stopChan)
c.wg.Wait()
}
// SendMessage sends a message to Matterbridge
func (c *MatterbridgeClient) SendMessage(msg MatterbridgeMessage) error {
jsonData, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}
req, err := http.NewRequest("POST", c.url+"/api/message", bytes.NewBuffer(jsonData))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
if c.apiToken != "" {
req.Header.Set("Authorization", "Bearer "+c.apiToken)
}
resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to send request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
return nil
}
// streamMessages continuously streams for messages from Matterbridge
func (c *MatterbridgeClient) streamMessages(messageHandler MessageHandler) {
defer c.wg.Done()
c.logger.Info("Streaming Matterbridge messages...")
retryDelay := 5 * time.Second
maxRetryDelay := 60 * time.Second
for {
select {
case <-c.stopChan:
return
default:
if err := c.doStreamMessages(messageHandler); err != nil {
c.logger.Errorf("Failed to stream messages from Matterbridge: %v", err)
c.logger.Infof("Retrying in %v...", retryDelay)
select {
case <-c.stopChan:
return
case <-time.After(retryDelay):
// Exponential backoff
retryDelay *= 2
if retryDelay > maxRetryDelay {
retryDelay = maxRetryDelay
}
}
} else {
// Reset retry delay on successful connection
retryDelay = 5 * time.Second
}
}
}
}
// doStreamMessages streams messages from Matterbridge and calls the handler for each message
func (c *MatterbridgeClient) doStreamMessages(messageHandler MessageHandler) error {
req, err := http.NewRequest("GET", c.url+"/api/stream", nil)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
if c.apiToken != "" {
req.Header.Set("Authorization", "Bearer "+c.apiToken)
}
// Create a context that gets cancelled when stopChan is closed
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Cancel the context when stopChan is closed
go func() {
select {
case <-c.stopChan:
cancel()
case <-ctx.Done():
}
}()
req = req.WithContext(ctx)
resp, err := c.streamHTTPClient.Do(req)
if err != nil {
return fmt.Errorf("failed to send request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
c.logger.Info("Connected to Matterbridge stream")
decoder := json.NewDecoder(resp.Body)
for {
select {
case <-ctx.Done():
c.logger.Info("Stopping stream")
return nil
default:
var msg MatterbridgeMessage
if err := decoder.Decode(&msg); err != nil {
if err.Error() == "EOF" || ctx.Err() != nil {
return nil
}
return fmt.Errorf("failed to decode message: %w", err)
}
// Skip empty messages and connection events
if msg.Username == "" || msg.Text == "" || msg.Event == "api_connected" {
continue
}
js, _ := json.Marshal(msg)
c.logger.Infof("Received message from Matterbridge: %s", string(js))
// Call the message handler
if err := messageHandler(msg); err != nil {
c.logger.Errorf("Message handler failed: %v", err)
}
}
}
}