Kubernetes 的各个组件都有一定的定时任务,比如日志的处理、任务的查询、缓存的使用等。Kubernetes 中的定时任务都是通过 wait 包实现的,比如在 Kubelet 中启动探针的检查:
// Start syncing probe status. This should only be called once.
func (m *manager) Start() {
// Start syncing readiness.
go wait.Forever(m.updateReadiness, 0)
// Start syncing startup.
go wait.Forever(m.updateStartup, 0)
}
Golang 的定时任务
在讲 Kubernetes 的 wait 库之前,先看下 Golang 应该怎么实现一个定时任务。
Golang 中的 time 库包含了很多和时间相关的工具,其中包括了 Ticker 和 Timer 两个定时器。
Ticker 只要完成定义,从计时开始,不需要其他操作,每间隔固定时间便会触发。
package main
import (
"fmt"
"time"
)
func main() {
d := time.Duration(time.Second * 5)
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
<-ticker.C
fmt.Println("Hello World")
}
}
而对于 Timer,在超时之后需要重置才能继续触发。
package main
import (
"fmt"
"time"
)
func main() {
d := time.Duration(time.Second * 5)
timer := time.NewTimer(d)
defer timer.Stop()
for {
<-timer.C
fmt.Println("Hello World")
timer.Reset(d)
}
}
需要注意的,无论哪种计时器,.C
都是一个 chan Time
类型且容量为 1 的单向 Channel,当有超过 1 个数据的时候便会被阻塞,以此保证不会被触发多次。
func NewTimer(d Duration) *Timer {
c := make(chan Time, 1)
t := &Timer{
C: c,
r: runtimeTimer{
when: when(d),
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}
Kubernetes 的 wait 库
常用 API
wait 库中实现了多种常用的 API,以提供定时执行函数的能力。
定期执行一个函数,永不停止
// Forever calls f every period for ever.
//
// Forever is syntactic sugar on top of Until.
func Forever(f func(), period time.Duration)
该函数支持一个函数参数和一个间隔时间,该函数会定期执行,不会停止。
定期执行一个函数,可以接受停止信号
// Until loops until stop channel is closed, running f every period.
//
// Until is syntactic sugar on top of JitterUntil with zero jitter factor and
// with sliding = true (which means the timer for period starts after the f
// completes).
func Until(f func(), period time.Duration, stopCh <-chan struct{})
该函数支持提供一个函数、间隔时间和发生 stop 信号的 channel,和 Forever 类似,不过可以通过向 stopCh 发布消息来停止。
定期检查先决条件
// Poll tries a condition func until it returns true, an error, or the timeout
// is reached.
//
// Poll always waits the interval before the run of 'condition'.
// 'condition' will always be invoked at least once.
//
// Some intervals may be missed if the condition takes too long or the time
// window is too short.
//
// If you want to Poll something forever, see PollInfinite.
func Poll(interval, timeout time.Duration, condition ConditionFunc) error
该函数将以 interval 为间隔,定期检查 condition 是否检查成功。
定期检查先决条件,直到检查成功或停止
// PollUntil tries a condition func until it returns true, an error or stopCh is
// closed.
//
// PollUntil always waits interval before the first run of 'condition'.
// 'condition' will always be invoked at least once.
func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error
核心代码
wait 库的定时任务 API 是基于 JitterUntil
实现的。
// JitterUntil loops until stop channel is closed, running f every period.
//
// If jitterFactor is positive, the period is jittered before every run of f.
// If jitterFactor is not positive, the period is unchanged and not jittered.
//
// If sliding is true, the period is computed after f runs. If it is false then
// period includes the runtime for f.
//
// Close stopCh to stop. f may not be invoked if stop channel is already
// closed. Pass NeverStop to if you don't want it stop.
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
var t *time.Timer
var sawTimeout bool
for {
select {
case <-stopCh:
return
default:
}
jitteredPeriod := period
if jitterFactor > 0.0 {
jitteredPeriod = Jitter(period, jitterFactor)
}
if !sliding {
t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
}
func() {
defer runtime.HandleCrash()
f()
}()
if sliding {
t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
}
// NOTE: b/c there is no priority selection in golang
// it is possible for this to race, meaning we could
// trigger t.C and stopCh, and t.C select falls through.
// In order to mitigate we re-check stopCh at the beginning
// of every loop to prevent extra executions of f().
select {
case <-stopCh:
return
case <-t.C:
sawTimeout = true
}
}
}
JitterUntil
的 5 个参数:
参数名 | 类型 | 作用 |
---|---|---|
f | func() |
需要定时执行的逻辑函数 |
period | time.Duration |
定时任务的时间间隔 |
jitterFactor | float64 |
如果大于 0.0 间隔时间变为 duration 到 duration + maxFactor * duration 的随机值 |
sliding | bool |
逻辑的执行时间是否不算入间隔时间 |
stopCh | <-chan struct{} |
接受停止信号的 channel |