Kubernetes 是怎么实现定时任务的

2019/10/11 14:51 下午 posted in  Kubernetes

Kubernetes 的各个组件都有一定的定时任务,比如日志的处理、任务的查询、缓存的使用等。Kubernetes 中的定时任务都是通过 wait 包实现的,比如在 Kubelet 中启动探针的检查:

// Start syncing probe status. This should only be called once.
func (m *manager) Start() {
    // Start syncing readiness.
    go wait.Forever(m.updateReadiness, 0)
    // Start syncing startup.
    go wait.Forever(m.updateStartup, 0)
}

Golang 的定时任务

在讲 Kubernetes 的 wait 库之前,先看下 Golang 应该怎么实现一个定时任务。

Golang 中的 time 库包含了很多和时间相关的工具,其中包括了 Ticker 和 Timer 两个定时器。

Ticker 只要完成定义,从计时开始,不需要其他操作,每间隔固定时间便会触发。

package main

import (
    "fmt"
    "time"
)

func main() {

    d := time.Duration(time.Second * 5)
    ticker := time.NewTicker(d)

    defer ticker.Stop()

    for {
        <-ticker.C

        fmt.Println("Hello World")
    }

}

而对于 Timer,在超时之后需要重置才能继续触发。

package main

import (
    "fmt"
    "time"
)

func main() {

    d := time.Duration(time.Second * 5)
    timer := time.NewTimer(d)

    defer timer.Stop()

    for {
        <-timer.C

        fmt.Println("Hello World")
        timer.Reset(d)
    }

}

需要注意的,无论哪种计时器,.C 都是一个 chan Time 类型且容量为 1 的单向 Channel,当有超过 1 个数据的时候便会被阻塞,以此保证不会被触发多次。

func NewTimer(d Duration) *Timer {
    c := make(chan Time, 1)
    t := &Timer{
        C: c,
        r: runtimeTimer{
            when: when(d),
            f:    sendTime,
            arg:  c,
        },
    }
    startTimer(&t.r)
    return t
}

Kubernetes 的 wait 库

常用 API

wait 库中实现了多种常用的 API,以提供定时执行函数的能力。

定期执行一个函数,永不停止

// Forever calls f every period for ever.
//
// Forever is syntactic sugar on top of Until.
func Forever(f func(), period time.Duration)

该函数支持一个函数参数和一个间隔时间,该函数会定期执行,不会停止。

定期执行一个函数,可以接受停止信号

// Until loops until stop channel is closed, running f every period.
//
// Until is syntactic sugar on top of JitterUntil with zero jitter factor and
// with sliding = true (which means the timer for period starts after the f
// completes).
func Until(f func(), period time.Duration, stopCh <-chan struct{}) 

该函数支持提供一个函数、间隔时间和发生 stop 信号的 channel,和 Forever 类似,不过可以通过向 stopCh 发布消息来停止。

定期检查先决条件

// Poll tries a condition func until it returns true, an error, or the timeout
// is reached.
//
// Poll always waits the interval before the run of 'condition'.
// 'condition' will always be invoked at least once.
//
// Some intervals may be missed if the condition takes too long or the time
// window is too short.
//
// If you want to Poll something forever, see PollInfinite.
func Poll(interval, timeout time.Duration, condition ConditionFunc) error

该函数将以 interval 为间隔,定期检查 condition 是否检查成功。

定期检查先决条件,直到检查成功或停止

// PollUntil tries a condition func until it returns true, an error or stopCh is
// closed.
//
// PollUntil always waits interval before the first run of 'condition'.
// 'condition' will always be invoked at least once.
func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error

核心代码

wait 库的定时任务 API 是基于 JitterUntil 实现的。

// JitterUntil loops until stop channel is closed, running f every period.
//
// If jitterFactor is positive, the period is jittered before every run of f.
// If jitterFactor is not positive, the period is unchanged and not jittered.
//
// If sliding is true, the period is computed after f runs. If it is false then
// period includes the runtime for f.
//
// Close stopCh to stop. f may not be invoked if stop channel is already
// closed. Pass NeverStop to if you don't want it stop.
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
    var t *time.Timer
    var sawTimeout bool

    for {
        select {
        case <-stopCh:
            return
        default:
        }

        jitteredPeriod := period
        if jitterFactor > 0.0 {
            jitteredPeriod = Jitter(period, jitterFactor)
        }

        if !sliding {
            t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
        }

        func() {
            defer runtime.HandleCrash()
            f()
        }()

        if sliding {
            t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
        }

        // NOTE: b/c there is no priority selection in golang
        // it is possible for this to race, meaning we could
        // trigger t.C and stopCh, and t.C select falls through.
        // In order to mitigate we re-check stopCh at the beginning
        // of every loop to prevent extra executions of f().
        select {
        case <-stopCh:
            return
        case <-t.C:
            sawTimeout = true
        }
    }
}

JitterUntil 的 5 个参数:

参数名 类型 作用
f func() 需要定时执行的逻辑函数
period time.Duration 定时任务的时间间隔
jitterFactor float64 如果大于 0.0 间隔时间变为 duration 到 duration + maxFactor * duration 的随机值
sliding bool 逻辑的执行时间是否不算入间隔时间
stopCh <-chan struct{} 接受停止信号的 channel