-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcontroller_scheduler.go
More file actions
338 lines (292 loc) · 11.3 KB
/
controller_scheduler.go
File metadata and controls
338 lines (292 loc) · 11.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
/*
Copyright 2026 The ARCORIS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bufferpool
import (
"errors"
"sync"
"time"
)
const (
errControllerSchedulerAlreadyRunning = "bufferpool.controllerSchedulerRuntime: scheduler already running"
errControllerSchedulerInvalidInterval = "bufferpool.controllerSchedulerRuntime: interval must be positive"
errControllerSchedulerNilTick = "bufferpool.controllerSchedulerRuntime: tick function must not be nil"
errControllerSchedulerNilTicker = "bufferpool.controllerSchedulerRuntime: ticker factory returned nil"
)
// controllerSchedulerTicker is the small timer surface the scheduler loop
// needs. Production uses time.NewTicker through controllerSchedulerTimeTicker;
// tests provide a manual ticker so scheduler behavior can be driven without
// sleeps, real timers, or nondeterministic cadence.
type controllerSchedulerTicker interface {
C() <-chan time.Time
Stop()
}
// controllerSchedulerTickerFactory constructs a ticker for one scheduler start.
//
// Factories are start-scoped rather than stored globally so owner tests can
// supply deterministic tickers while production code keeps the default
// time.NewTicker-backed implementation.
type controllerSchedulerTickerFactory func(time.Duration) controllerSchedulerTicker
// controllerSchedulerStartInput contains the cold control-plane dependencies
// needed to start one internal scheduler runtime.
//
// This is not public policy. PoolPartition and PoolGroup policy decide whether
// construction starts the runtime. Tick is intentionally the only owner
// callback; the scheduler does not allocate or retain controller reports, score
// diagnostics, trim candidates, samples, or snapshots.
type controllerSchedulerStartInput struct {
// Interval is the fixed ticker cadence for this scheduler run.
Interval time.Duration
// Tick performs one owner foreground controller cycle. It should normally be
// an owner Tick or TickInto wrapper, so all lifecycle gates, no-overlap
// behavior, status publication, and report construction remain owned by the
// existing controller path.
Tick func() error
// IsClosedError classifies owner-close errors. Closed-owner errors stop the
// scheduler because later foreground cycles cannot be accepted. Other errors
// are ignored here because Tick/TickInto already publish retained
// lightweight status and return full diagnostics to direct callers.
IsClosedError func(error) bool
// TickerFactory overrides the production ticker for deterministic tests. A
// nil factory uses time.NewTicker through newControllerSchedulerTicker.
TickerFactory controllerSchedulerTickerFactory
}
type controllerSchedulerPreparedStart struct {
input controllerSchedulerStartInput
ticker controllerSchedulerTicker
}
// isZero reports whether this value owns no prepared ticker.
func (p controllerSchedulerPreparedStart) isZero() bool {
return p.ticker == nil
}
// stop releases a prepared ticker that was not transferred to a runtime.
//
// A prepared start owns its ticker immediately after preparation. Callers must
// either transfer that ownership to startPrepared/replacePrepared or stop it on
// every later error path.
func (p controllerSchedulerPreparedStart) stop() {
if p.ticker != nil {
p.ticker.Stop()
}
}
// controllerSchedulerRuntime is an owner-local internal scheduler primitive.
//
// It only owns start/stop mechanics and ticker dispatch. It does not interpret
// policy, does not publish runtime snapshots, does not retain full reports, and
// does not decide whether a PoolPartition or PoolGroup should enable
// scheduling. Owner integration calls Start explicitly after construction policy
// validation and calls Stop before cleanup locks that Tick may need.
//
// The primitive can be started again after Stop because that is a lifecycle-safe
// property of the internal runtime. PoolPartition and PoolGroup use that
// property for owner-local PublishPolicy reconfiguration while still routing all
// scheduled work through TickInto.
type controllerSchedulerRuntime struct {
mu sync.Mutex
running bool
stopping bool
stop chan struct{}
done chan struct{}
}
// Start launches one scheduler goroutine for input.
//
// Start validates only internal runtime wiring: positive interval, non-nil
// Tick, and a usable ticker. It returns a stable ErrInvalidOptions-classified
// error when called while a scheduler is already running. The already-running
// path is explicit because silently starting a second loop would overlap owner
// controller cycles and would make shutdown ownership ambiguous.
func (r *controllerSchedulerRuntime) Start(input controllerSchedulerStartInput) error {
r.mu.Lock()
running := r.running
r.mu.Unlock()
if running {
return newError(ErrInvalidOptions, errControllerSchedulerAlreadyRunning)
}
prepared, err := prepareControllerSchedulerStart(input)
if err != nil {
return err
}
return r.startPrepared(prepared)
}
// Replace prepares a new scheduler run before stopping the current one, then
// swaps the runtime to the new ticker and interval.
//
// The caller must not hold owner locks that Tick may need. Replace waits for the
// old goroutine to exit through Stop, then starts the already-prepared run so a
// bad ticker factory cannot stop a working scheduler before validation fails.
func (r *controllerSchedulerRuntime) Replace(input controllerSchedulerStartInput) error {
prepared, err := prepareControllerSchedulerStart(input)
if err != nil {
return err
}
return r.replacePrepared(prepared)
}
func prepareControllerSchedulerStart(input controllerSchedulerStartInput) (controllerSchedulerPreparedStart, error) {
if input.Interval <= 0 {
return controllerSchedulerPreparedStart{}, newError(ErrInvalidOptions, errControllerSchedulerInvalidInterval)
}
if input.Tick == nil {
return controllerSchedulerPreparedStart{}, newError(ErrInvalidOptions, errControllerSchedulerNilTick)
}
tickerFactory := input.TickerFactory
if tickerFactory == nil {
tickerFactory = newControllerSchedulerTicker
}
ticker := tickerFactory(input.Interval)
if ticker == nil {
return controllerSchedulerPreparedStart{}, newError(ErrInvalidOptions, errControllerSchedulerNilTicker)
}
input.TickerFactory = tickerFactory
return controllerSchedulerPreparedStart{
input: input,
ticker: ticker,
}, nil
}
func (r *controllerSchedulerRuntime) startPrepared(prepared controllerSchedulerPreparedStart) error {
r.mu.Lock()
defer r.mu.Unlock()
if r.running {
prepared.stop()
return newError(ErrInvalidOptions, errControllerSchedulerAlreadyRunning)
}
if prepared.isZero() {
return newError(ErrInvalidOptions, errControllerSchedulerNilTicker)
}
stop := make(chan struct{})
done := make(chan struct{})
r.running = true
r.stopping = false
r.stop = stop
r.done = done
go r.run(prepared.input.Tick, prepared.input.IsClosedError, prepared.ticker, stop, done)
return nil
}
func (r *controllerSchedulerRuntime) replacePrepared(prepared controllerSchedulerPreparedStart) error {
if prepared.isZero() {
return newError(ErrInvalidOptions, errControllerSchedulerNilTicker)
}
r.Stop()
return r.startPrepared(prepared)
}
// Stop requests scheduler shutdown and waits for the scheduler goroutine to
// exit. Stop is idempotent and can be called before Start.
//
// The method closes only the scheduler-owned stop channel while holding mu, then
// releases mu before waiting on done. That ordering is important for owner
// integration: callers must not hold owner locks that Tick may need while
// waiting for Stop, and Stop itself must not hold the scheduler mutex across the
// goroutine exit path.
func (r *controllerSchedulerRuntime) Stop() {
r.mu.Lock()
if !r.running {
r.mu.Unlock()
return
}
if !r.stopping {
close(r.stop)
r.stopping = true
}
done := r.done
r.mu.Unlock()
<-done
}
// isRunning reports whether a scheduler goroutine is currently active.
//
// This is intentionally internal. It exists to support deterministic unit tests
// and owner assertions without exposing scheduler state as public API.
func (r *controllerSchedulerRuntime) isRunning() bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.running
}
// run owns the scheduler goroutine.
//
// The loop dispatches Tick only from ticker events and ignores ordinary tick
// errors. Closed-owner errors stop the loop because the owner can no longer
// accept foreground controller work. The loop always stops its ticker and
// publishes done exactly once, including when the owner closes itself during a
// Tick call.
func (r *controllerSchedulerRuntime) run(
tick func() error,
isClosedError func(error) bool,
ticker controllerSchedulerTicker,
stop <-chan struct{},
done chan struct{},
) {
tickerC := ticker.C()
defer r.finish(ticker, done)
for {
select {
case _, ok := <-tickerC:
if !ok {
return
}
if err := tick(); err != nil && controllerSchedulerIsClosedError(isClosedError, err) {
return
}
case <-stop:
return
}
}
}
// finish marks the runtime stopped, stops the ticker, and releases Stop waiters.
//
// The state update and done close happen while holding mu so a concurrent Stop
// either observes a running scheduler and waits on this done channel, or observes
// a fully stopped runtime. That avoids the small race where Stop could return
// before the goroutine finished its shutdown bookkeeping.
func (r *controllerSchedulerRuntime) finish(ticker controllerSchedulerTicker, done chan struct{}) {
ticker.Stop()
r.mu.Lock()
if r.done == done {
r.running = false
r.stopping = false
r.stop = nil
r.done = nil
}
close(done)
r.mu.Unlock()
}
// controllerSchedulerIsClosedError applies the owner-supplied closed classifier.
//
// A nil classifier falls back to errors.Is(err, ErrClosed), which matches the
// existing owner lifecycle contract while keeping the start input compact for
// tests that do not need custom classification.
func controllerSchedulerIsClosedError(isClosedError func(error) bool, err error) bool {
if err == nil {
return false
}
if isClosedError != nil {
return isClosedError(err)
}
return errors.Is(err, ErrClosed)
}
// controllerSchedulerTimeTicker adapts time.Ticker to controllerSchedulerTicker.
type controllerSchedulerTimeTicker struct {
ticker *time.Ticker
}
// newControllerSchedulerTicker creates the production ticker.
//
// This is the only production timer construction in the scheduler foundation.
// Owner constructors reach it only through explicit opt-in scheduler policy; the
// default manual policy path never constructs a ticker or starts a goroutine.
func newControllerSchedulerTicker(interval time.Duration) controllerSchedulerTicker {
return controllerSchedulerTimeTicker{ticker: time.NewTicker(interval)}
}
// C returns the production ticker channel.
func (t controllerSchedulerTimeTicker) C() <-chan time.Time {
return t.ticker.C
}
// Stop releases the production ticker.
func (t controllerSchedulerTimeTicker) Stop() {
t.ticker.Stop()
}