-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathasync_handler.go
More file actions
84 lines (75 loc) · 2.23 KB
/
async_handler.go
File metadata and controls
84 lines (75 loc) · 2.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package clog
import (
"sync"
)
const defaultCapacity = 100
// AsyncHandler asynchronously send log messages to the next handler in the chain
type AsyncHandler struct {
*middlewareHandler
wg sync.WaitGroup
entries chan LogEntry
done chan interface{}
closed bool
}
// NewAsyncHandler returns a handler that sends logs asynchronously.
func NewAsyncHandler(destination Handler) *AsyncHandler {
return NewAsyncHandlerWithCapacity(destination, defaultCapacity)
}
// NewAsyncHandlerWithCapacity returns a handler that sends logs asynchronously.
func NewAsyncHandlerWithCapacity(next Handler, capacity uint) *AsyncHandler {
entries := make(chan LogEntry, capacity)
done := make(chan interface{})
handler := &AsyncHandler{
middlewareHandler: newMiddlewareHandler(next),
wg: sync.WaitGroup{},
entries: entries,
done: done,
closed: false,
}
// start the goroutine to handle messages in the background
go func(handler Handler, entries chan LogEntry, done chan interface{}) {
for logEntry := range entries {
_ = handler.LogEntry(logEntry)
}
// the entries channels has been drained
close(done)
}(next, entries, done)
return handler
}
// LogEntry sends the log message asynchronously to the next handler.
// Please note the call will block when the buffer capacity is reached.
// It will return an error if there's no "next" handler, of if we called the Close method.
// Otherwise it will always return nil as it doesn't know if the message will be delivered.
func (h *AsyncHandler) LogEntry(logEntry LogEntry) error {
// make sure we don't keep sending messages to a closed channel
if h.closed {
return ErrHandlerClosed
}
if h.next == nil {
return ErrNoRegisteredHandler
}
h.entries <- logEntry
return nil
}
// SetPrefix sets a prefix on every log message
func (h *AsyncHandler) SetPrefix(prefix string) Handler {
if h.next == nil {
return h
}
prefixer, ok := h.next.(Prefixer)
if ok {
prefixer.SetPrefix(prefix)
}
return h
}
// Close blocks until all log messages have been delivered
func (h *AsyncHandler) Close() {
h.closed = true
close(h.entries)
<-h.done
}
// Verify interface
var (
_ Handler = &AsyncHandler{}
_ MiddlewareHandler = &AsyncHandler{}
)