Kubernetes 是怎么实现定时任务的

2019/10/11 14:51 PM posted in  Kubernetes

Kubernetes 的各个组件都有一定的定时任务,比如日志的处理、任务的查询、缓存的使用等。Kubernetes 中的定时任务都是通过 wait 包实现的,比如在 Kubelet 中启动探针的检查:

// Start syncing probe status. This should only be called once.
func (m *manager) Start() {
	// Start syncing readiness.
	go wait.Forever(m.updateReadiness, 0)
	// Start syncing startup.
	go wait.Forever(m.updateStartup, 0)
}

Golang 的定时任务

在讲 Kubernetes 的 wait 库之前,先看下 Golang 应该怎么实现一个定时任务。

Golang 中的 time 库包含了很多和时间相关的工具,其中包括了 Ticker 和 Timer 两个定时器。

Ticker 只要完成定义,从计时开始,不需要其他操作,每间隔固定时间便会触发。

package main

import (
	"fmt"
	"time"
)

func main() {

	d := time.Duration(time.Second * 5)
	ticker := time.NewTicker(d)

	defer ticker.Stop()

	for {
		<-ticker.C

		fmt.Println("Hello World")
	}

}

而对于 Timer,在超时之后需要重置才能继续触发。

package main

import (
	"fmt"
	"time"
)

func main() {

	d := time.Duration(time.Second * 5)
	timer := time.NewTimer(d)

	defer timer.Stop()

	for {
		<-timer.C

		fmt.Println("Hello World")
		timer.Reset(d)
	}

}

需要注意的,无论哪种计时器,.C 都是一个 chan Time 类型且容量为 1 的单向 Channel,当有超过 1 个数据的时候便会被阻塞,以此保证不会被触发多次。

func NewTimer(d Duration) *Timer {
	c := make(chan Time, 1)
	t := &Timer{
		C: c,
		r: runtimeTimer{
			when: when(d),
			f:    sendTime,
			arg:  c,
		},
	}
	startTimer(&t.r)
	return t
}

Kubernetes 的 wait 库

常用 API

wait 库中实现了多种常用的 API,以提供定时执行函数的能力。

定期执行一个函数,永不停止

// Forever calls f every period for ever.
//
// Forever is syntactic sugar on top of Until.
func Forever(f func(), period time.Duration)

该函数支持一个函数参数和一个间隔时间,该函数会定期执行,不会停止。

定期执行一个函数,可以接受停止信号

// Until loops until stop channel is closed, running f every period.
//
// Until is syntactic sugar on top of JitterUntil with zero jitter factor and
// with sliding = true (which means the timer for period starts after the f
// completes).
func Until(f func(), period time.Duration, stopCh <-chan struct{}) 

该函数支持提供一个函数、间隔时间和发生 stop 信号的 channel,和 Forever 类似,不过可以通过向 stopCh 发布消息来停止。

定期检查先决条件

// Poll tries a condition func until it returns true, an error, or the timeout
// is reached.
//
// Poll always waits the interval before the run of 'condition'.
// 'condition' will always be invoked at least once.
//
// Some intervals may be missed if the condition takes too long or the time
// window is too short.
//
// If you want to Poll something forever, see PollInfinite.
func Poll(interval, timeout time.Duration, condition ConditionFunc) error

该函数将以 interval 为间隔,定期检查 condition 是否检查成功。

定期检查先决条件,直到检查成功或停止

// PollUntil tries a condition func until it returns true, an error or stopCh is
// closed.
//
// PollUntil always waits interval before the first run of 'condition'.
// 'condition' will always be invoked at least once.
func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error

核心代码

wait 库的定时任务 API 是基于 JitterUntil 实现的。

// JitterUntil loops until stop channel is closed, running f every period.
//
// If jitterFactor is positive, the period is jittered before every run of f.
// If jitterFactor is not positive, the period is unchanged and not jittered.
//
// If sliding is true, the period is computed after f runs. If it is false then
// period includes the runtime for f.
//
// Close stopCh to stop. f may not be invoked if stop channel is already
// closed. Pass NeverStop to if you don't want it stop.
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
	var t *time.Timer
	var sawTimeout bool

	for {
		select {
		case <-stopCh:
			return
		default:
		}

		jitteredPeriod := period
		if jitterFactor > 0.0 {
			jitteredPeriod = Jitter(period, jitterFactor)
		}

		if !sliding {
			t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
		}

		func() {
			defer runtime.HandleCrash()
			f()
		}()

		if sliding {
			t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
		}

		// NOTE: b/c there is no priority selection in golang
		// it is possible for this to race, meaning we could
		// trigger t.C and stopCh, and t.C select falls through.
		// In order to mitigate we re-check stopCh at the beginning
		// of every loop to prevent extra executions of f().
		select {
		case <-stopCh:
			return
		case <-t.C:
			sawTimeout = true
		}
	}
}

JitterUntil 的 5 个参数:

参数名 类型 作用
f func() 需要定时执行的逻辑函数
period time.Duration 定时任务的时间间隔
jitterFactor float64 如果大于 0.0 间隔时间变为 duration 到 duration + maxFactor * duration 的随机值
sliding bool 逻辑的执行时间是否不算入间隔时间
stopCh <-chan struct{} 接受停止信号的 channel