-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtrigger.go
More file actions
272 lines (232 loc) · 5.64 KB
/
Copy pathtrigger.go
File metadata and controls
272 lines (232 loc) · 5.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
package evtrigger
import (
"context"
"errors"
"sync"
"go.uber.org/zap"
)
var (
defaultWorkerSize = 1
defaultBufferSize = 128
)
var (
ErrCallbackNil = errors.New("trigger: callback nil")
ErrCallbackExist = errors.New("trigger: callback exist")
ErrEventIllegal = errors.New("trigger: event error")
)
// https://github.com/grpc/grpc-go/blob/689f7b154ee8a3f3ab6a6107ff7ad78189baae06/internal/transport/controlbuf.go#L40
type itemNode struct {
it interface{}
next *itemNode
}
type itemList struct {
head *itemNode
tail *itemNode
}
func (il *itemList) enqueue(i interface{}) {
n := &itemNode{it: i}
if il.tail == nil {
il.head, il.tail = n, n
return
}
il.tail.next = n
il.tail = n
}
func (il *itemList) dequeue() interface{} {
if il.head == nil {
return nil
}
i := il.head.it
il.head = il.head.next
if il.head == nil {
il.tail = nil
}
return i
}
func (il *itemList) ForEach(visitor func(it interface{}) error) error {
if il.head == nil {
return nil
}
// 保持head位置不变,临时变量ih走到tail,逐个调用visitor
ih := il.head
for ih != il.tail {
if err := visitor(ih.it); err != nil {
return err
}
ih = ih.next
}
return nil
}
type Trigger interface {
Register(key string, callback TriggerCallback) error
Put(event *TriggerEvent) error
ForEach(visitor func(it interface{}) error) error
Close()
}
type TriggerEvent struct {
Key string
Value interface{}
}
type trigger struct {
ctx context.Context
cancelFunc context.CancelFunc
// wg 管理goroutine退出
wg sync.WaitGroup
lg *zap.Logger
listMu sync.Mutex
// list 无边界的缓存,使用chan需要有边界,存储外部事件。
// 这里的 itemList 没有按照key做区分,同key的事件在goroutine池有资源的情况下,
// 可能并发执行,所以调用方提供的callback需要保证threadsafe,即便做到同key顺序下发,
// 不同key的配置更新,对于调用方的callback方法也可能访问相同的资源。
list *itemList
// ch 和 consumerWaiting 的设定参考:
// https://github.com/grpc/grpc-go/blob/689f7b154ee8a3f3ab6a6107ff7ad78189baae06/internal/transport/controlbuf.go#L286
// ch 触发等待 list 中新进元素的goroutine
ch chan struct{}
// consumerWaiting 在grpc-go中的设定是loopyWriter的run在调用get(获取事件)这个事情上花费的时间相比网络io较少,所以不能让事件的进入
// 卡在ch的写入上。用ch做串联,即便提供一定的buffer,还在response写出和回复数据的进入上做了一定耦合。
consumerWaiting bool
// callbacks 记录需要callback的event
callbacks map[string]TriggerCallback
// buffer 存储提交给goroutine池的
buffer chan *TriggerEvent
}
// TriggerCallback 把event的value给到调用方
type TriggerCallback func(key string, value interface{}) error
type triggerOptions struct {
// workerSize 处理callback的goroutine数量
workerSize int
lg *zap.Logger
}
type TriggerOption func(options *triggerOptions)
func WithWorkerSize(v int) TriggerOption {
return func(options *triggerOptions) {
options.workerSize = v
}
}
func WithLogger(v *zap.Logger) TriggerOption {
return func(options *triggerOptions) {
options.lg = v
}
}
func NewTrigger(opts ...TriggerOption) (Trigger, error) {
ops := &triggerOptions{}
for _, opt := range opts {
opt(ops)
}
var lg *zap.Logger
if ops.lg != nil {
lg = ops.lg
} else {
lg, _ = zap.NewProduction()
}
// ctx和cancel
ctx, cancel := context.WithCancel(context.Background())
tgr := trigger{
ctx: ctx,
cancelFunc: cancel,
wg: sync.WaitGroup{},
lg: lg,
list: &itemList{},
buffer: make(chan *TriggerEvent, defaultBufferSize),
ch: make(chan struct{}, 1),
callbacks: make(map[string]TriggerCallback),
}
// 运行worker
workerSize := ops.workerSize
if ops.workerSize <= 0 {
workerSize = defaultWorkerSize
}
for i := 0; i < workerSize; i++ {
tgr.wg.Add(1)
go tgr.run()
}
// 运行event获取
tgr.wg.Add(1)
go tgr.get()
return &tgr, nil
}
func (tgr *trigger) Register(key string, callback TriggerCallback) error {
if callback == nil {
return ErrCallbackNil
}
_, ok := tgr.callbacks[key]
if ok {
tgr.lg.Warn(
"key already exist",
zap.String("key", key),
)
return ErrCallbackExist
}
tgr.callbacks[key] = callback
return nil
}
func (tgr *trigger) Close() {
if tgr.cancelFunc != nil {
tgr.cancelFunc()
}
tgr.wg.Wait()
}
func (tgr *trigger) Put(event *TriggerEvent) error {
if event == nil || event.Key == "" {
return ErrEventIllegal
}
var wakeUp bool
tgr.listMu.Lock()
if tgr.consumerWaiting {
wakeUp = true
tgr.consumerWaiting = false
}
tgr.list.enqueue(event)
tgr.listMu.Unlock()
if wakeUp {
select {
case tgr.ch <- struct{}{}:
default:
}
}
return nil
}
func (tgr *trigger) get() {
defer tgr.wg.Done()
for {
tgr.listMu.Lock()
h := tgr.list.dequeue()
if h != nil {
tgr.listMu.Unlock()
ev := h.(*TriggerEvent)
tgr.buffer <- ev
continue
}
tgr.consumerWaiting = true
tgr.listMu.Unlock()
select {
case <-tgr.ch:
// 解决通过轮询等待的问题,否则就需要在 trigger.list 为空的时候,等待一个可配置的事件
case <-tgr.ctx.Done():
tgr.lg.Info("get exit")
return
}
}
}
func (tgr *trigger) run() {
defer tgr.wg.Done()
for {
select {
case <-tgr.ctx.Done():
tgr.lg.Info("run exit")
return
case ev := <-tgr.buffer:
if err := tgr.callbacks[ev.Key](ev.Key, ev.Value); err != nil {
tgr.lg.Error(
"callback error",
zap.Error(err),
zap.String("ev-key", ev.Key),
)
}
}
}
}
func (tgr *trigger) ForEach(visitor func(it interface{}) error) error {
return tgr.list.ForEach(visitor)
}