-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmanager.go
More file actions
72 lines (63 loc) · 1.71 KB
/
manager.go
File metadata and controls
72 lines (63 loc) · 1.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package taskflow
import (
"context"
"fmt"
"github.com/google/uuid"
"sync"
"time"
)
type Manager struct {
cfg *Config
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
handlers map[Operation]JobHandler
advancedHandlers map[Operation]AdvancedJobConstructor
wakeup chan struct{}
}
// startWorkersInternal is basically your old StartWorkers logic
func startWorkersInternal(ctx context.Context, count int, cfg *Config, handlers map[Operation]JobHandler, advHandlers map[Operation]AdvancedJobConstructor) *Manager {
mgrCtx, cancel := context.WithCancel(ctx)
mgr := &Manager{
cfg: cfg,
ctx: mgrCtx,
cancel: cancel,
handlers: handlers,
advancedHandlers: advHandlers,
wakeup: make(chan struct{}, count),
}
cfg.logInfo(LogEvent{
Message: fmt.Sprintf("Starting %d workers...", count),
})
for i := 0; i < count; i++ {
w := &Worker{
id: uuid.New().String(),
cfg: cfg,
manager: mgr,
}
mgr.wg.Add(1)
go func(worker *Worker) {
defer mgr.wg.Done()
worker.Run(mgr.ctx)
}(w)
}
return mgr
}
// Shutdown attempts a graceful shutdown: cancel context, wait for workers up to 'timeout'.
func (m *Manager) Shutdown(timeout time.Duration) {
m.cfg.logInfo(LogEvent{Message: "Shutdown requested. Stopping workers..."})
m.cancel()
doneCh := make(chan struct{})
go func() {
m.wg.Wait()
close(doneCh)
}()
select {
case <-doneCh:
m.cfg.logInfo(LogEvent{Message: "All workers exited cleanly."})
case <-time.After(timeout):
m.cfg.logError(LogEvent{
Message: fmt.Sprintf("Shutdown timed out after %v. Some workers may still be running.", timeout),
})
}
}