Kubernetes Controller Manager 工作原理

本文基于对 Kubernetes v1.16 的源码阅读,文章有一定的源码,但我会通过配图尽量描述清晰

在 Kubernetes Master 节点中,有三个重要组件:ApiServer、ControllerManager、Scheduler,它们一起承担了整个集群的管理工作。本文尝试梳理清楚 ControllerManager 的工作流程和原理。

什么是 Controller Manager

根据官方文档的说法:kube-controller-manager 运行控制器,它们是处理集群中常规任务的后台线程。

说白了,Controller Manager 就是集群内部的管理控制中心,由负责不同资源的多个 Controller 构成,共同负责集群内的 Node、Pod 等所有资源的管理,比如当通过 Deployment 创建的某个 Pod 发生异常退出时,RS Controller 便会接受并处理该退出事件,并创建新的 Pod 来维持预期副本数。

几乎每种特定资源都有特定的 Controller 维护管理以保持预期状态,而 Controller Manager 的职责便是把所有的 Controller 聚合起来:

  1. 提供基础设施降低 Controller 的实现复杂度
  2. 启动和维持 Controller 的正常运行

可以这么说,Controller 保证集群内的资源保持预期状态,而 Controller Manager 保证了 Controller 保持在预期状态。

Controller 工作流程

在讲解 Controller Manager 怎么为 Controller 提供基础设施和运行环境之前,我们先了解一下 Controller 的工作流程是什么样子的。

从比较高维度的视角看,Controller Manager 主要提供了一个分发事件的能力,而不同的 Controller 只需要注册对应的 Handler 来等待接收和处理事件。

以 Deployment Controller 举例,在 pkg/controller/deployment/deployment_controller.goNewDeploymentController 方法中,便包括了 Event Handler 的注册,对于 Deployment Controller 来说,只需要根据不同的事件实现不同的处理逻辑,便可以实现对相应资源的管理。

dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc:    dc.addDeployment,
    UpdateFunc: dc.updateDeployment,
    // This will enter the sync loop and no-op, because the deployment has been deleted from the store.
    DeleteFunc: dc.deleteDeployment,
})
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc:    dc.addReplicaSet,
    UpdateFunc: dc.updateReplicaSet,
    DeleteFunc: dc.deleteReplicaSet,
})
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    DeleteFunc: dc.deletePod,
})

可以看到,在 Controller Manager 的帮助下,Controller 的逻辑可以做的非常纯粹,只需要实现相应的 EventHandler 即可,那么 Controller Manager 都做了哪些具体的工作呢?

Controller Manager 架构

辅助 Controller Manager 完成事件分发的是 client-go,而其中比较关键的模块便是 informer。

kubernetes 在 github 上提供了一张 client-go 的架构图,从中可以看出,Controller 正是下半部分(CustomController)描述的内容,而 Controller Manager 主要完成的是上半部分。

Informer 工厂

从上图可以看到 Informer 是一个非常关键的 “桥梁” 作用,因此对 Informer 的管理便是 Controller Manager 要做的第一件事。

在 Controller Manager 启动时,便会创建一个名为 SharedInformerFactory 的单例工厂,因为每个 Informer 都会与 Api Server 维持一个 watch 长连接,所以这个单例工厂通过为所有 Controller 提供了唯一获取 Informer 的入口,来保证每种类型的 Informer 只被实例化一次。

该单例工厂的初始化逻辑:

// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
    factory := &sharedInformerFactory{
        client:           client,
        namespace:        v1.NamespaceAll,
        defaultResync:    defaultResync,
        informers:        make(map[reflect.Type]cache.SharedIndexInformer),
        startedInformers: make(map[reflect.Type]bool),
        customResync:     make(map[reflect.Type]time.Duration),
    }

    // Apply all options
    for _, opt := range options {
        factory = opt(factory)
    }

    return factory
}

从上面的初始化逻辑中可以看到,sharedInformerFactory 中最重要的是名为 informers 的 map,其中 key 为资源类型,而 value 便是关注该资源类型的 Informer。每种类型的 Informer 只会被实例化一次,并存储在 map 中,不同 Controller 需要相同资源的 Informer 时只会拿到同一个 Informer 实例。

对于 Controller Manager 来说,维护所有的 Informer 使其正常工作,是保证所有 Controller 正常工作的基础条件。sharedInformerFactory 通过该 map 维护了所有的 informer 实例,因此,sharedInformerFactory 也承担了提供统一启动入口的职责:

// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    f.lock.Lock()
    defer f.lock.Unlock()

    for informerType, informer := range f.informers {
        if !f.startedInformers[informerType] {
            go informer.Run(stopCh)
            f.startedInformers[informerType] = true
        }
    }
}

当 Controller Manager 启动时,最重要的就是通过该工厂的 Start 方法,将所有的 Informer 运行起来。

Informer 的创建

下面看下这些 Informer 是怎么被创建的。Controller Manager 在 cmd/kube-controller-manager/app/controllermanager.goNewControllerInitializers 函数中初识化了所有的 Controller,因为代码冗长,这里仅拿 Deployment Controller 举例子。

初始化 Deployment Controller 的逻辑在 cmd/kube-controller-manager/app/apps.gostartDeploymentController 的函数中:

func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
    if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
        return nil, false, nil
    }
    dc, err := deployment.NewDeploymentController(
        ctx.InformerFactory.Apps().V1().Deployments(),
        ctx.InformerFactory.Apps().V1().ReplicaSets(),
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.ClientBuilder.ClientOrDie("deployment-controller"),
    )
    if err != nil {
        return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
    }
    go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
    return nil, true, nil
}

最关键的逻辑在 deployment.NewDeploymentController 上,该函数真正创建了 Deployment Controller,而该创建函数的前三个参数分别为 Deployment、ReplicaSet、Pod 的 Informer。可以看到,Informer 的单例工厂以 ApiGroup 为路径提供了不同资源的 Informer 创建入口。

不过要注意的是,.Apps().V1().Deployments() 虽然返回的是 deploymentInformer 类型的实例,但是,deploymentInformer 其实并不是一个真正的 Informer(尽管他以 Informer 命名),它只是一个模板类,主要功能是提供关注 Deployment 这一特定资源 Informer 的创建模板:

// Deployments returns a DeploymentInformer.
func (v *version) Deployments() DeploymentInformer {
    return &deploymentInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}

真正创建 Informer 的逻辑是在 deploymentInformer.Informer() 中(client-go/informers/apps/v1/deployment.go),f.defaultInformer 是默认的 Deployment Informer 创建模板方法,通过将资源实例和该模板方法传入 Informer 工厂的 InformerFor 方法,来创建仅关注 Deployment 资源的 Informer:

func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
    return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}

简单梳理一下:

  1. 可以通过 Informer 工厂获得特定类型的 Informer 模板类(即这里的 deploymentInformer
  2. 真正创建该特定资源 Informer 的是 Informer 模板类的 Informer() 方法
  3. Informer() 方法只不过是通过 Informer 工厂的 InformerFor 来创建真正的 Informer

这里用到了模板方法(设计模式),虽然有一点绕口,但可以参考下图梳理一下,理解关键在于 Informer 的 差异化的创建逻辑下放给了模板类

最后,名为 sharedIndexInformer 的结构体将被实例化,并真正的承担 Informer 的职责。被注册到 Informer 工厂 map 中的也是该实例。

Informer 的运行

因为真正的 Informer 实例是一个 sharedIndexInformer 类型的对象,当 Informer 工厂启动时(执行 Start 方法),被真正运行起来的是 sharedIndexInformer

sharedIndexInformer 是 client-go 里的组件,它的 Run 方法虽然短短几十行,但是却承担了很多工作。到这里,才到了 Controller Manager 最有趣的部分。

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()

    fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

    cfg := &Config{
        Queue:            fifo,
        ListerWatcher:    s.listerWatcher,
        ObjectType:       s.objectType,
        FullResyncPeriod: s.resyncCheckPeriod,
        RetryOnError:     false,
        ShouldResync:     s.processor.shouldResync,

        Process: s.HandleDeltas,
    }

    func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()

        s.controller = New(cfg)
        s.controller.(*controller).clock = s.clock
        s.started = true
    }()

    // Separate stop channel because Processor should be stopped strictly after controller
    processorStopCh := make(chan struct{})
    var wg wait.Group
    defer wg.Wait()              // Wait for Processor to stop
    defer close(processorStopCh) // Tell Processor to stop
    wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
    wg.StartWithChannel(processorStopCh, s.processor.run)

    defer func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()
        s.stopped = true // Don't want any new listeners
    }()
    s.controller.Run(stopCh)
}

sharedIndexInformer 的启动逻辑主要做了下面几件事:

  1. 创建了名为 fifo 的队列
  2. 创建并运行了一个名为 controller 的实例
  3. 启动了 cacheMutationDetector
  4. 启动了 processor

这几个名词(或者说组件)前文并没有提到过,而这四件事情是 Controller Manager 工作的核心内容,因此下面我会分别介绍。

sharedIndexInformer

sharedIndexInformer 是一个共享的 Informer 框架,不同的 Controller 只需要提供一个模板类(比如上文提到的 deploymentInformer ),便可以创建一个符合自己需求的特定 Informer。

sharedIndexInformer 包含了一堆工具来完成 Informer 的任务,其主要代码在 client-go/tools/cache/shared_informer.go 中。其创建逻辑也在其中:

// NewSharedIndexInformer creates a new instance for the listwatcher.
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
    realClock := &clock.RealClock{}
    sharedIndexInformer := &sharedIndexInformer{
        processor:                       &sharedProcessor{clock: realClock},
        indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
        listerWatcher:                   lw,
        objectType:                      objType,
        resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
        defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
        cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
        clock:                           realClock,
    }
    return sharedIndexInformer
}

在创建逻辑中,有几个东西需要留意:

  1. processor:提供了 EventHandler 注册和事件分发的功能
  2. indexer:提供了资源缓存的功能
  3. listerWatcher:由模板类提供,包含特定资源的 List 和 Watch 方法
  4. objectType:用来标记关注哪种特定资源类型
  5. cacheMutationDetector:监控 Informer 的缓存

除此之外,还包含了上文启动逻辑中提到了 DeltaFIFO 队列和 controller,下面就分别介绍。

sharedProcessor

processor 是 sharedIndexInformer 中一个非常有趣的组件,Controller Manager 通过一个 Informer 单例工厂来保证不同的 Controller 共享了同一个 Informer,但是不同的 Controller 对该共享的 Informer 注册的 Handler 不同,那么 Informer 应该怎么管理被注册的 Handler 呢?

processor 便是用来管理被注册的 Handler 以及将事件分发给不同 Handler 的组件。

type sharedProcessor struct {
    listenersStarted bool
    listenersLock    sync.RWMutex
    listeners        []*processorListener
    syncingListeners []*processorListener
    clock            clock.Clock
    wg               wait.Group
}

sharedProcessor 的工作核心是围绕着 listeners 这个 Listener 切片展开的。

当我们注册一个 Handler 到 Informer 时,最终会被转换为一个名为 processorListener 结构体的实例:

func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
    ret := &processorListener{
        nextCh:                make(chan interface{}),
        addCh:                 make(chan interface{}),
        handler:               handler,
        pendingNotifications:  *buffer.NewRingGrowing(bufferSize),
        requestedResyncPeriod: requestedResyncPeriod,
        resyncPeriod:          resyncPeriod,
    }

    ret.determineNextResync(now)

    return ret
}

该实例主要包含两个 channel 和外面注册的 Handler 方法。而此处被实例化的 processorListener 对象最终会被添加到 sharedProcessor.listeners 列表中:

func (p *sharedProcessor) addListener(listener *processorListener) {
    p.listenersLock.Lock()
    defer p.listenersLock.Unlock()

    p.addListenerLocked(listener)
    if p.listenersStarted {
        p.wg.Start(listener.run)
        p.wg.Start(listener.pop)
    }
}

如图所示,Controller 中的 Handler 方法最终会被添加到 Listener 中,而 Listener 将会被 append 到 sharedProcessor 的 Listeners 切片中。

前文提到,sharedIndexInformer 启动时会将 sharedProcessor 运行起来,而 sharedProcessor 的启动逻辑便是和这些 listener 有关:

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
    func() {
        p.listenersLock.RLock()
        defer p.listenersLock.RUnlock()
        for _, listener := range p.listeners {
            p.wg.Start(listener.run)
            p.wg.Start(listener.pop)
        }
        p.listenersStarted = true
    }()
    <-stopCh
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()
    for _, listener := range p.listeners {
        close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
    }
    p.wg.Wait() // Wait for all .pop() and .run() to stop
}

可以看到,sharedProcessor 启动时会依次执行 listenerrunpop 方法,我们现在看下这两个方法。

listener 的启动

因为 listener 包含了 Controller 注册进来的 Handler 方法,因此 listener 最重要的职能就是当事件发生时来触发这些方法,而 listener.run 就是不停的从 nextCh 这个 channel 中拿到事件并执行对应的 handler:

func (p *processorListener) run() {
    // this call blocks until the channel is closed.  When a panic happens during the notification
    // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
    // the next notification will be attempted.  This is usually better than the alternative of never
    // delivering again.
    stopCh := make(chan struct{})
    wait.Until(func() {
        // this gives us a few quick retries before a long pause and then a few more quick retries
        err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
            for next := range p.nextCh {
                switch notification := next.(type) {
                case updateNotification:
                    p.handler.OnUpdate(notification.oldObj, notification.newObj)
                case addNotification:
                    p.handler.OnAdd(notification.newObj)
                case deleteNotification:
                    p.handler.OnDelete(notification.oldObj)
                default:
                    utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
                }
            }
            // the only way to get here is if the p.nextCh is empty and closed
            return true, nil
        })

        // the only way to get here is if the p.nextCh is empty and closed
        if err == nil {
            close(stopCh)
        }
    }, 1*time.Minute, stopCh)
}

可以看到,listener.run 不停的从 nextCh 这个 channel 中拿到事件,但是 nextCh 这个 channel 里的事件又是从哪来的呢?listener.pop 的职责便是将事件放入 nextCh 中。

listener.pop 是一段非常精巧和有趣的逻辑:

func (p *processorListener) pop() {
    defer utilruntime.HandleCrash()
    defer close(p.nextCh) // Tell .run() to stop

    var nextCh chan<- interface{}
    var notification interface{}
    for {
        select {
        case nextCh <- notification:
            // Notification dispatched
            var ok bool
            notification, ok = p.pendingNotifications.ReadOne()
            if !ok { // Nothing to pop
                nextCh = nil // Disable this select case
            }
        case notificationToAdd, ok := <-p.addCh:
            if !ok {
                return
            }
            if notification == nil { // No notification to pop (and pendingNotifications is empty)
                // Optimize the case - skip adding to pendingNotifications
                notification = notificationToAdd
                nextCh = p.nextCh
            } else { // There is already a notification waiting to be dispatched
                p.pendingNotifications.WriteOne(notificationToAdd)
            }
        }
    }
}

listener 之所以包含了两个 channel:addChnextCh,是因为 Informer 无法预知 listener.handler 的事件消费的速度是否大于事件生产的速度,因此添加了一个名为 pendingNotifications 的缓冲队列来保存未来得及消费的事件。

pop 方法一方面会不停的从 addCh 中获得最新事件,以保证不会让生产方阻塞。然后判断是否存在 buffer,如果存在则把事件添加到 buffer 中,如果不存在则尝试推给 nextCh

而另一方面,会判断 buffer 中是否还有事件,如果还有存量,则不停的传递给 nextCh

pop 方法实现了一个带 buffer 的分发机制,使得事件可以源源不断的从 addChnextCh。但是问题来了,那 addCh 的事件从哪来呢。

其实来源非常简单,listener 有一个 add 方法,入参是一个事件,该方法会将新事件推入 addCh 中。而调用该 add 方法的是管理所有 listenersharedProcessor

上面提到过,sharedProcessor 的职责便是管理所有的 Handler 以及分发事件,而真正做分发工作的是 distribute 方法:

func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()

    if sync {
        for _, listener := range p.syncingListeners {
            listener.add(obj)
        }
    } else {
        for _, listener := range p.listeners {
            listener.add(obj)
        }
    }
}

到目前为止,我们有一部分比较清晰了:

  1. Controller 将 Handler 注册给 Informer
  2. Informer 通过 sharedProcessor 维护了所有的 Handler(listener)
  3. Informer 收到事件时,通过 sharedProcessor.distribute 将事件分发下去
  4. Controller 被触发对应的 Handler 来处理自己的逻辑

那么剩下的问题就是 Informer 的事件从哪来呢?

DeltaFIFO

在分析 Informer 获取事件之前,需要提前讲一个非常有趣的小工具,就是在 sharedIndexInformer.Run 的时候创建的 fifo 队列:

fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

DeltaFIFO 是一个非常有趣的队列,相关代码定义在 client-go/tools/cache/delta_fifo.go 中。对于一个队列来说,最重要的肯定是 Add 方法和 Pop 方法,DeltaFIFO 提供了多个 Add 方法,虽然根据不同的事件类型(add/update/delete/sync)区分不同的方法,但是最终都会执行 queueActionLocked

// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }

    // If object is supposed to be deleted (last event is Deleted),
    // then we should ignore Sync events, because it would result in
    // recreation of this object.
    if actionType == Sync && f.willObjectBeDeletedLocked(id) {
        return nil
    }

    newDeltas := append(f.items[id], Delta{actionType, obj})
    newDeltas = dedupDeltas(newDeltas)

    if len(newDeltas) > 0 {
        if _, exists := f.items[id]; !exists {
            f.queue = append(f.queue, id)
        }
        f.items[id] = newDeltas
        f.cond.Broadcast()
    } else {
        // We need to remove this from our map (extra items in the queue are
        // ignored if they are not in the map).
        delete(f.items, id)
    }
    return nil
}

queueActionLocked 方法的第一个参数 actionType 便是事件类型:

const (
    Added   DeltaType = "Added"   // watch api 获得的创建事件
    Updated DeltaType = "Updated" // watch api 获得的更新事件
    Deleted DeltaType = "Deleted" // watch api 获得的删除事件
    Sync DeltaType = "Sync"       // 触发了 List Api,需要刷新缓存
)

从事件类型以及入队列方法可以看出,这是一个带有业务功能的队列,并不是单纯的“先入先出”,入队列方法中有两个非常精巧的设计:

  1. 入队列的事件会先判断该资源是否存在未被消费的事件,然后适当处理
  2. 如果 list 方法时发现该资源已经被删除了,则不再处理

第二点比较好理解,如果触发了 List 请求,而且发现要被处理的资源已经被删除了,则就不需要再入队列处理。而第一点需要结合出队列方法一起来看:

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
    f.lock.Lock()
    defer f.lock.Unlock()
    for {
        for len(f.queue) == 0 {
            // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
            // When Close() is called, the f.closed is set and the condition is broadcasted.
            // Which causes this loop to continue and return from the Pop().
            if f.IsClosed() {
                return nil, ErrFIFOClosed
            }

            f.cond.Wait()
        }
        id := f.queue[0]
        f.queue = f.queue[1:]
        if f.initialPopulationCount > 0 {
            f.initialPopulationCount--
        }
        item, ok := f.items[id]
        if !ok {
            // Item may have been deleted subsequently.
            continue
        }
        delete(f.items, id)
        err := process(item)
        if e, ok := err.(ErrRequeue); ok {
            f.addIfNotPresent(id, item)
            err = e.Err
        }
        // Don't need to copyDeltas here, because we're transferring
        // ownership to the caller.
        return item, err
    }
}

DeltaFIFO 的 Pop 方法有一个入参,即是处理函数,出队列时,DeltaFIFO 会先根据资源 id 获得该资源 所有的事件,然后交给处理函数。

工作流程如图所示:

总体来看,DeltaFIFO 的入队列方法,会先判断该资源是否已经在 items 中, 如果已经存在,说明该资源还没有被消费(还在 queue 中排队),则直接将事件 append 到 items[resource_id] 中即可。如果发现不在 items 中,便会创建 items[resource_id],并将资源 id append 到 queue 中。

而 DeltaFIFO 出队列方法,会从 queue 中拿到队列最前面的资源 id,然后从 items 中拿走该资源所有的事件,最后调用 Pop 方法传入的 PopProcessFunc 类型的处理函数。

因此,DeltaFIFO 的特点在于,入队列的是(资源的)事件,而出队列时是拿到的是最早入队列的资源的所有事件。这样的设计保证了不会因为有某个资源疯狂的制造事件,导致其他资源没有机会被处理而产生饥饿。

controller

DeltaFIFO 是一个非常重要的组件,真正让他发挥价值的,便是 Informer 的 controller

虽然 K8s 源码中的确用的是 controller 这个词,但是此 controller 并不是 Deployment Controller 这种资源控制器。而是一个承上启下的事件控制器(从 API Server 拿到事件,下发给 Informer 进行处理)。

controller 的职责就两个:

  1. 通过 List-Watch 从 Api Server 获得事件、并将该事件推入 DeltaFIFO 中
  2. sharedIndexInformerHandleDeltas 方法作为参数,来调用 DeltaFIFO 的 Pop 方法

controller 的定义非常简单,它的核心就是 Reflector

type controller struct {
    config         Config
    reflector      *Reflector
    reflectorMutex sync.RWMutex
    clock          clock.Clock
}

Reflector 的代码比较繁琐但是功能比较简单,就是通过 sharedIndexInformer 里定义的 listerWatcher 进行 List-Watch,并将获得的事件推入 DeltaFIFO 中。

controller 启动之后会先将 Reflector 启动,然后在执行 processLoop,通过一个死循环,不停的将从 DeltaFIFO 读出需要处理的资源事件,并交给 sharedIndexInformerHandleDeltas 方法(创建 controller 时赋值给了 config.Process)。

func (c *controller) processLoop() {
    for {
        obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
        if err != nil {
            if err == ErrFIFOClosed {
                return
            }
            if c.config.RetryOnError {
                // This is the safe way to re-enqueue.
                c.config.Queue.AddIfNotPresent(obj)
            }
        }
    }
}

如果我们再查看下 sharedIndexInformer 的 HandleDeltas 方法,就会发现整个事件消费流程被打通了:

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()

    // from oldest to newest
    for _, d := range obj.(Deltas) {
        switch d.Type {
        case Sync, Added, Updated:
            isSync := d.Type == Sync
            s.cacheMutationDetector.AddObject(d.Object)
            if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                if err := s.indexer.Update(d.Object); err != nil {
                    return err
                }
                s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
            } else {
                if err := s.indexer.Add(d.Object); err != nil {
                    return err
                }
                s.processor.distribute(addNotification{newObj: d.Object}, isSync)
            }
        case Deleted:
            if err := s.indexer.Delete(d.Object); err != nil {
                return err
            }
            s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
        }
    }
    return nil
}

前面我们知道了 processor.distribute 方法可以将事件分发给所有 listener,而 controller 会使用 Reflector 从 ApiServer 拿到事件,并入队列,然后通过 processLoop 从队列中拿出要处理的资源的所有事件,最后通过 sharedIndexInformerHandleDeltas 方法,调用了 processor.distribute

因此,我们可以将整个事件流向整理为下图:

Indexer

以上,我们将事件从接收到分发,中间所有的逻辑已经梳理了一遍,但是在 sharedIndexInformer 的 HandleDeltas 方法中,还有一些逻辑比较令人注意,就是所有的事件都会先对 s.indexer 进行更新,然后在分发。

前面提到 Indexer 是一个线程安全的存储,作为缓存使用,为了减轻资源控制器(Controller)查询资源时对 ApiServer 的压力。

当有任何事件更新时,会先刷新 Indexer 里的缓存,然后再将事件分发给资源控制器,资源控制器在需要获得资源详情的时候,优先从 Indexer 获得,就可以减少对 APIServer 不必要的查询请求。

Indexer 存储的具体实现在 client-go/tools/cache/thread_safe_store.go 中,数据存储在 threadSafeMap 中:

type threadSafeMap struct {
    lock  sync.RWMutex
    items map[string]interface{}

    // indexers maps a name to an IndexFunc
    indexers Indexers
    // indices maps a name to an Index
    indices Indices
}

从本质上讲,threadSafeMap 就是加了一个读写锁的 map。除此之外,还可以定义索引,索引的实现非常有趣,通过两个字段完成:

  1. Indexers 是一个 map,定义了若干求索引函数,key 为 indexName,value 为求索引的函数(计算资源的索引值)。
  2. Indices 则保存了索引值和数据 key 的映射关系,Indices 是一个两层的 map,第一层的 key 为 indexName,和 Indexers 对应,确定使用什么方法计算索引值,value 是一个 map,保存了 “索引值-资源key” 的关联关系。

相关逻辑比较简单,可以参考下图:

MutationDetector

sharedIndexInformerHandleDeltas 方法中,除了向 s.indexer 更新的数据之外,还向 s.cacheMutationDetector 更新了数据。

在一开始讲到 sharedIndexInformer 启动时还会启动一个 cacheMutationDetector,来监控 indexer 的缓存。

因为 indexer 缓存的其实是一个指针,多个 Controller 访问 indexer 缓存的资源,其实获得的是同一个资源实例。如果有一个 Controller 并不本分,修改了资源的属性,势必会影响到其他 Controller 的正确性。

MutationDetector 的作用正是定期检查有没有缓存被修改,当 Informer 接收到新事件时,MutationDetector 会保存该资源的指针(和 indexer 一样),以及该资源的深拷贝。通过定期检查指针指向的资源和开始存储的深拷贝是否一致,便知道被缓存的资源是否被修改。

不过,具体是否启用监控是受到环境变量 KUBE_CACHE_MUTATION_DETECTOR 影响的,如果不设置该环境变量,sharedIndexInformer 实例化的是 dummyMutationDetector,在启动后什么事情也不做。

如果 KUBE_CACHE_MUTATION_DETECTOR 为 true,则 sharedIndexInformer 实例化的是 defaultCacheMutationDetector,该实例会以 1s 为间隔,定期执行检查缓存,如果发现缓存被修改,则会触发一个失败处理函数,如果该函数没被定义,则会触发一个 panic。

总结

本文讲解的应该算是狭义的 Controller Manager,毕竟没有包含具体的资源管理器(Controller),而只是讲解 Controller Manager 是怎么 “Manage Controller” 的。

可以看到 Controller Manager 做了很多工作来保证 Controller 可以只专注于处理自己关心的事件,而这些工作的核心就是 Informer,当理解了 Informer 是如何与其他组件协同工作,那么 Controller Manager 为资源管理器铺垫了什么也就了然了。

拓展

2019/12/15 14:23 下午 posted in  Kubernetes

使用 Kubernetes 联邦(Kubefed)进行多集群管理

前一篇文章 《Kubernetes 多集群管理:Kubefed(Federation v2)》对 Federation v2 的基本概念和工作原理简单介绍,本文着重介绍 Kubefed 的使用。

本文的实验环境采用 v0.1.0-rc6 进行。

$ kubefedctl version
kubefedctl version: version.Info{Version:"v0.1.0-rc6", GitCommit:"7586b42f4f477f1912caf28287fa2e0a7f68f407", GitTreeState:"clean", BuildDate:"2019-08-17T03:55:05Z", GoVersion:"go1.12.5", Compiler:"gc", Platform:"linux/amd64"}

安装

Federation v2 的安装分两个部分,一是 Controller Plan 和 kubefedctl。

Controller Plan

Controller Plan 可以使用 Helm 部署(目前 Helm 还是使用 v2 版本),参考官方安装文档:https://github.com/kubernetes-sigs/kubefed/blob/master/charts/kubefed/README.md

添加 helm repo:

$ helm repo add kubefed-charts https://raw.githubusercontent.com/kubernetes-sigs/kubefed/master/charts

$ helm repo list
NAME            URL
kubefed-charts   https://raw.githubusercontent.com/kubernetes-sigs/kubefed/master/charts

找到目前的版本:

$ helm search kubefed
NAME                            CHART VERSION   APP VERSION DESCRIPTION
kubefed-charts/kubefed          0.1.0-rc6                   KubeFed helm chart
kubefed-charts/federation-v2    0.0.10                      Kubernetes Federation V2 helm chart

然后使用 helm 直接安装最新版本即可:

$ helm install kubefed-charts/kubefed --name kubefed --version=0.1.0-rc6 --namespace kube-federation-system

kubefedctl

kubefedctl 是一个二进制程序,可以在 Github 的 Release 页面找到最新版本的下载地址:https://github.com/kubernetes-sigs/kubefed/releases

$ wget https://github.com/kubernetes-sigs/kubefed/releases/download/v0.1.0-rc6/kubefedctl-0.1.0-rc6-linux-amd64.tgz

$ tar -zxvf kubefedctl-0.1.0-rc6-linux-amd64.tgz

$ mv kubefedctl /usr/local/bin/

kubefedctl 提供了很多便捷操作,比如集群注册、资源注册等。

多集群管理

可以使用 kubefedctl join 命令接入新集群,在接入之前,需要先将多个集群信息配置在本地的 kubeconfig 中。

基本使用方式为:

kubefedctl join <集群名称> --cluster-context <要接入集群的 context 名称> --host-cluster-context <HOST 集群的 context>

比如:

kubefedctl join cluster1 --cluster-context cluster1 \
    --host-cluster-context cluster1 --v=2
kubefedctl join cluster2 --cluster-context cluster2 \
    --host-cluster-context cluster1 --v=2

Kubefed 是利用 CR 来存储自己所需要的数据,因此当使用 kubefedctl join 后,可以在 host cluster 查看到集群信息:

$ kubectl -n kube-federation-system get kubefedclusters
NAME       READY   AGE
cluster1   True    3d22h
cluster2   True    3d22h
cluster3   True    3d22h

kubefedctl join 命令只是将 Kubeconfig 里的配置转化为 KubeFedCluster 自定义资源存储到 kube-federation-system 命名空间中:

$ kubectl -n kube-federation-system get kubefedclusters cluster1 -o yaml
apiVersion: core.kubefed.io/v1beta1
kind: KubeFedCluster
metadata:
  creationTimestamp: "2019-10-24T08:05:38Z"
  generation: 1
  name: cluster1
  namespace: kube-federation-system
  resourceVersion: "647452"
  selfLink: /apis/core.kubefed.io/v1beta1/namespaces/kube-federation-system/kubefedclusters/cluster1
  uid: 4c5eb57f-5ed4-4cec-89f3-cfc062492ae0
spec:
  apiEndpoint: https://172.16.200.1:6443
  caBundle: LS....Qo=
  secretRef:
    name: cluster1-shb2x
status:
  conditions:
  - lastProbeTime: "2019-10-28T06:25:58Z"
    lastTransitionTime: "2019-10-28T05:13:47Z"
    message: /healthz responded with ok
    reason: ClusterReady
    status: "True"
    type: Ready
  region: ""

资源

Federation v1 的淘汰的原因之一便是对资源拓展比较死板(需要拓展 API Server)而且没有预料的 CRD 的大规模应用,因此 Federation v2 在资源管理上面做的非常灵活。

对于 KubeFed 来说,资源管理分两类,一是资源的类型管理,另一个是被联邦(federated)的资源管理。

对于资源类型,kubefedctl 提供了 enable 来使新的资源可以被联邦管理:

kubefedctl enable <target kubernetes API type>

其中 可以使用以下的描述:

  • 类型,即 Kind (比如 Deployment)
  • 复数名词 (比如 deployments)
  • 带 api group 的复数资源名词 (比如 deployment.apps)
  • 缩写 (比如 deploy)

比如我们需要把 istio 中的 VirtualService 资源交给联邦管理,可以使用:

kubefedctl enable VirtualService

因为 Kubefed 是通过 CRD 管理资源,因此,当 enable 执行之后可以看到 Host Cluster 中新增了一种名为 federatedvirtualservices 的 CRD:

$ kubectl get crd | grep virtualservice
federatedvirtualservices.types.kubefed.io            2019-10-24T13:12:46Z
virtualservices.networking.istio.io                  2019-10-24T08:06:01Z

该 CRD 里面描述了 federatedvirtualservices 类型的必需字段,比如:placementoverrides 等。

kubefedctl enable 完成了资源类型的管理,对于需要被联邦的资源管理编辑基于新创建的 CRD 展开的。不过要部署资源之前,需要先创建 federatednamespaces ,多集群的资源只会部署到被 kubefed 管理的 namespace 中:

$ kubectl get federatednamespaces
NAME      AGE
default   3d21h

这里尝试创建一个 federatedvirtualservices 类型的资源:

$ kubectl get federatedvirtualservices
NAME            AGE
service-route   3d4h

完整 yaml:

apiVersion: types.kubefed.io/v1beta1
kind: FederatedVirtualService
metadata:
  name: service-route
  namespace: default
spec:
  placement:
    clusters:
    - name: cluster1
    - name: cluster2
    - name: cluster3
  template:
    metadata:
      name: service-route
    spec:
      gateways:
      - service-gateway
      hosts:
      - '*'
      http:
      - match:
        - uri:
            prefix: /
        route:
        - destination:
            host: service-a-1
            port:
              number: 3000

这时,Kubefed 会根据 template 里的描述为目标集群创建对应的 virtualservice 资源。

$ kubectl get virtualservices
NAME            GATEWAYS            HOSTS   AGE
service-route   [service-gateway]   [*]     3d4h

调度

Kubefed 目前只能做到一些简单的集群间调度,即手工指定。

对于手工指定的调度方式主要分为两部分,一是直接在资源中制定目的地,二是通过 ReplicaSchedulingPreference 进行比例分配。

对于每个被联邦的资源来说,都有一个 placement 字段用来描述将要部署在哪个集群,可以从 CRD 的描述中了解其定义思路:

placement:
  properties:
    clusterSelector:
      properties:
        matchExpressions:
          items:
            properties:
              key:
                type: string
              operator:
                type: string
              values:
                items:
                  type: string
                type: array
            required:
            - key
            - operator
            type: object
          type: array
        matchLabels:
          additionalProperties:
            type: string
          type: object
      type: object
    clusters:
      items:
        properties:
          name:
            type: string
        required:
        - name
        type: object
      type: array
  type: object

使用示例如下,可以通过 clusters 指定一个 cluster 列表,或者通过 clusterSelector 来根据集群标签选择集群:

spec:
  placement:
    clusters:
      - name: cluster2
      - name: cluster1
    clusterSelector:
      matchLabels:
        foo: bar

不过有两点要注意:

  1. 如果 clusters 字段被指定,clusterSelector 将会被忽略
  2. 被选择的集群是平等的,该资源会在每个被选中的集群中部署一个无差别副本

如果需要在多个集群间进行区别调度的话就需要引入 ReplicaSchedulingPreference 进行按比例的调度了。

ReplicaSchedulingPreference 定义了包括多个和调度相关的字段来描述调度策略:

apiVersion: scheduling.kubefed.io/v1alpha1
kind: ReplicaSchedulingPreference
metadata:
  name: test-deployment
  namespace: test-ns
spec:
  targetKind: FederatedDeployment
  totalReplicas: 9
  clusters:
    A:
      minReplicas: 4
      maxReplicas: 6
      weight: 1
    B:
      minReplicas: 4
      maxReplicas: 8
      weight: 2
  • totalReplicas 定义了总副本数
  • clusters 描述不同集群的 最大\最小 副本以及权重

Kubefed 会根据调度策略的定义来进行维护不同集群的副本数,具体细节可以参考文档:( https://github.com/kubernetes-sigs/kubefed/blob/master/docs/userguide.md#replicaschedulingpreference )。

网络

Kubefed 还有一个亮点功能是跨集群间的网络访问。Kubefed 通过引入外部 DNS,将 Ingress Controller 和 metallb 等外部 LB 结合起来,使跨集群的流量可配置。

以 Ingress 举例,用户可以创建 IngressDNSRecord 类型的资源,并指定域名,Kubefed 将会根据 IngressDNSRecord 自定配置相关的 DNS 策略,并应用到外部服务器中。

创建 IngressDNSRecord 类型的资源:

apiVersion: multiclusterdns.kubefed.io/v1alpha1
kind: IngressDNSRecord
metadata:
  name: test-ingress
  namespace: test-namespace
spec:
  hosts:
  - ingress.example.com
  recordTTL: 300

DNS Endpoint controller 会生成相关的 DNSEndpoint

$ kubectl -n test-namespace get dnsendpoints -o yaml
apiVersion: v1
items:
- apiVersion: multiclusterdns.kubefed.io/v1alpha1
  kind: DNSEndpoint
  metadata:
    creationTimestamp: 2018-10-10T20:37:38Z
    generation: 1
    name: ingress-test-ingress
    namespace: test-namespace
    resourceVersion: "251874"
    selfLink: /apis/multiclusterdns.kubefed.io/v1alpha1/namespaces/test-namespace/dnsendpoints/ingress-test-ingress
    uid: 538d1063-cccc-11e8-bebb-42010a8a00b8
  spec:
    endpoints:
    - dnsName: ingress.example.com
      recordTTL: 300
      recordType: A
      targets:
      - $CLUSTER1_INGRESS_IP
      - $CLUSTER2_INGRESS_IP
  status: {}
kind: List
metadata:
  resourceVersion: ""
  selfLink: ""

ExternalDNS controller 会监听 DNSEndpoint 资源,收到事件后会将该记录应用到 DNS 服务器上,如果成员集群的内部 DNS 服务器使用该外部 DNS 服务器作为上游服务器,那么成员集群可以直接访问对于域名就可以实现跨集群访问。

部署

官方仓库中有完整的实例以供实验,可以参考:https://github.com/kubernetes-sigs/kubefed/tree/master/example

除了调度之外,Kubefed 通过 overrides 字段实现不同集群间的差异化部署:

apiVersion: types.kubefed.io/v1beta1
kind: FederatedDeployment
metadata:
  name: test-deployment
  namespace: test-namespace
spec:
  template:
    metadata:
      labels:
        app: nginx
    spec:
      replicas: 3
      selector:
        matchLabels:
          app: nginx
      template:
        metadata:
          labels:
            app: nginx
        spec:
          containers:
          - image: nginx
            name: nginx
  placement:
    clusters:
    - name: cluster2
    - name: cluster1
  overrides:
  - clusterName: cluster2
    clusterOverrides:
    - path: "/spec/replicas"
      value: 5
    - path: "/spec/template/spec/containers/0/image"
      value: "nginx:1.17.0-alpine"
    - path: "/metadata/annotations"
      op: "add"
      value:
        foo: bar
    - path: "/metadata/annotations/foo"
      op: "remove"

对该 Deployment 部署之后,可以通过 kubectl describe 查看部署状态:

$ kubectl describe federateddeployment.types.kubefed.io/test-deployment
Name:         test-deployment
Namespace:    default
Labels:       <none>
Annotations:  kubectl.kubernetes.io/last-applied-configuration:
                {"apiVersion":"types.kubefed.io/v1beta1","kind":"FederatedDeployment","metadata":{"annotations":{},"name":"test-deployment","namespace":"d...
API Version:  types.kubefed.io/v1beta1
Kind:         FederatedDeployment
Metadata:
  Creation Timestamp:  2019-10-28T07:55:34Z
  Finalizers:
    kubefed.io/sync-controller
  Generation:        1
  Resource Version:  657714
  Self Link:         /apis/types.kubefed.io/v1beta1/namespaces/default/federateddeployments/test-deployment
  UID:               6016a3eb-7e7f-4756-ba40-b655581f06ad
Spec:
  Overrides:
    Cluster Name:  cluster2
    Cluster Overrides:
      Path:   /spec/replicas
      Value:  5
      Path:   /spec/template/spec/containers/0/image
      Value:  nginx:1.17.0-alpine
      Op:     add
      Path:   /metadata/annotations
      Value:
        Foo:  bar
      Op:     remove
      Path:   /metadata/annotations/foo
  Placement:
    Clusters:
      Name:  cluster2
      Name:  cluster1
  Template:
    Metadata:
      Labels:
        App:  nginx
    Spec:
      Replicas:  3
      Selector:
        Match Labels:
          App:  nginx
      Template:
        Metadata:
          Labels:
            App:  nginx
        Spec:
          Containers:
            Image:  nginx
            Name:   nginx
Status:
  Clusters:
    Name:  cluster1
    Name:  cluster2
  Conditions:
    Last Transition Time:  2019-10-28T07:55:35Z
    Last Update Time:      2019-10-28T07:55:49Z
    Status:                True
    Type:                  Propagation
Events:
  Type    Reason           Age                From                            Message
  ----    ------           ----               ----                            -------
  Normal  CreateInCluster  14s                federateddeployment-controller  Creating Deployment "default/test-deployment" in cluster "cluster2"
  Normal  CreateInCluster  14s                federateddeployment-controller  Creating Deployment "default/test-deployment" in cluster "cluster1"
  Normal  UpdateInCluster  0s (x10 over 12s)  federateddeployment-controller  Updating Deployment "default/test-deployment" in cluster "cluster2"

以及可以看到,在不同集群间的差异:

$ kubectl --context=cluster1 get deploy | grep test
test-deployment   3/3     3            3           98s

$ kubectl --context=cluster2 get deploy | grep test
test-deployment   5/5     5            5           105s
2019/10/23 17:32 下午 posted in  Kubernetes

Kubernetes 多集群管理:Kubefed(Federation v2)

Kubefed(Federation v2)即 Kubernetes 联邦,是目前社区正在难产的多集群解决方案,目前的版本是 0.1.0,如果考虑到 Federation v1 的话,Kubefed 也算是有个出师未捷身先死的大兄弟了。

Federation v1 为什么被弃用

Federation v1 在Kubernetes v1.6 时进入了 Beta 阶段,但之后就没有更进一步的发展,一直到 Kubernetes v1.11 左右正式被弃用。至于被废弃的原因是因为开发团队认为集群联邦的实践比想象中还要困难,有许多问题是 v1 架构没被考虑进去的,比如:

  • 控制平面组件会因为发生问题,而影响整体集群效率。
  • 无法兼容新的 Kubernetes API 资源。
  • 无法有效的在多个集群管理权限,如不支持 RBAC。
  • 联邦层级的设定与策略依赖 API 资源的 Annotations 内容,这使得弹性不佳。

从 Federation v1 架构上看,Federation 主要由 API Server、Controller Manager 和外部存储 etcd 构成。

Federation API Server 基本复用了 Kube Api Server,对外提供统一的资源管理入口,但只允许使用 Adapter 拓展支持的 Kubernetes 资源。

Controller Manager 协调不同集群之间的状态,通过与成员集群的 Api Server 通讯,来统筹管理所有的 Kubernetes 成员集群。

Federation v1 整体的架构和 Kubernetes 自身的架构还是很像的,并将成员集群作为一种资源进行管理。但是因为 v1 一开始并没有设计到灵活的添加新 Kubernetes 资源以及 CRD,以至于每当创建一种新资源都要新增 Adapter。

本来资源设计的就非常不灵活,加之 RBAC 的支持问题,使得无法做到多集群资源的权限管理,因而流产,并为 v2 积累了宝贵的教训。

Federation v2 设计

Federation v2 利用 CRD 实现了整体功能,通过定义多种自定义资源(CR),从而省掉了 v1 的 API Server,但也因此引入了 Host Cluster 的概念。

基本概念

  • Federate:联邦(Federate)是指联结一组 Kubernetes 集群,并为其提供公共的跨集群部署和访问接口
  • KubeFed:Kubernetes Cluster Federation,为用户提供跨集群的资源分发、服务发现和高可用
  • Host Cluster:部署 Kubefed API 并允许 Kubefed Control Plane
  • Cluster Registration:通过 kubefedctl join 使得成员集群加入到主集群(Host Cluster)
  • Member Cluster:通过 KubeFed API 注册为成员并受 KubeFed 管理的集群,主集群(Host Cluster)也可以作为成员集群(Member Cluster)
  • ServiceDNSRecord: 记录 Kubernetes Service 信息,并通过 DNS 使其可以跨集群访问
  • IngressDNSRecord:记录 Kubernetes Ingress 信息,并通过 DNS 使其可以跨集群访问
  • DNSEndpoint:一个记录(ServiceDNSRecord/IngressDNSRecord 的) Endpoint 信息的自定义资源

架构

虽然 Federation v2 在设计上做了非常大的变更并省掉了 API Server ,但总体架构变动并不大,当将 Federation Control Plan 部署完成之后可以看到由两个组件构成:

$ kubectl -n kube-federation-system get deploy
NAME                         READY   UP-TO-DATE   AVAILABLE   AGE
kubefed-admission-webhook    1/1     1            1            3s
kubefed-controller-manager   2/2     2            2            3s

admission-webhook 提供了准入控制,controller-manager 处理自定义资源以及协调不同集群间的状态。

工作原理

在逻辑上,Federation v2 分为两个大部分:configuration 和 propagation。

configuration 的设计明显吸取了 v1 的教训,将很多会变化的内容配置化,configuration 主要包含两个配置:

  • Type configuration:用来描述将被联邦托管的资源类型
  • Cluster configuration:用来保存被联邦托管的集群的 API 认证信息

对于 Type configuration,联邦 v2 是下足了功夫,包含三个关键部分:

  • Templates 用于描述被联邦的资源
  • Placement 用来描述将被部署的集群
  • Overrides 允许对部分集群的部分资源进行覆写

以上基本上完成了资源的定义并为 propagation 提供了资源描述。除此之外,Federation v2 还支持定义部署策略和调度规则,实现更精细的管理。

使用

使用请参考:《使用 Kubernetes 联邦(Kubefed)进行多集群管理

参考

2019/10/21 16:28 下午 posted in  Kubernetes

Kubernetes 是怎么实现定时任务的

Kubernetes 的各个组件都有一定的定时任务,比如日志的处理、任务的查询、缓存的使用等。Kubernetes 中的定时任务都是通过 wait 包实现的,比如在 Kubelet 中启动探针的检查:

// Start syncing probe status. This should only be called once.
func (m *manager) Start() {
    // Start syncing readiness.
    go wait.Forever(m.updateReadiness, 0)
    // Start syncing startup.
    go wait.Forever(m.updateStartup, 0)
}

Golang 的定时任务

在讲 Kubernetes 的 wait 库之前,先看下 Golang 应该怎么实现一个定时任务。

Golang 中的 time 库包含了很多和时间相关的工具,其中包括了 Ticker 和 Timer 两个定时器。

Ticker 只要完成定义,从计时开始,不需要其他操作,每间隔固定时间便会触发。

package main

import (
    "fmt"
    "time"
)

func main() {

    d := time.Duration(time.Second * 5)
    ticker := time.NewTicker(d)

    defer ticker.Stop()

    for {
        <-ticker.C

        fmt.Println("Hello World")
    }

}

而对于 Timer,在超时之后需要重置才能继续触发。

package main

import (
    "fmt"
    "time"
)

func main() {

    d := time.Duration(time.Second * 5)
    timer := time.NewTimer(d)

    defer timer.Stop()

    for {
        <-timer.C

        fmt.Println("Hello World")
        timer.Reset(d)
    }

}

需要注意的,无论哪种计时器,.C 都是一个 chan Time 类型且容量为 1 的单向 Channel,当有超过 1 个数据的时候便会被阻塞,以此保证不会被触发多次。

func NewTimer(d Duration) *Timer {
    c := make(chan Time, 1)
    t := &Timer{
        C: c,
        r: runtimeTimer{
            when: when(d),
            f:    sendTime,
            arg:  c,
        },
    }
    startTimer(&t.r)
    return t
}

Kubernetes 的 wait 库

常用 API

wait 库中实现了多种常用的 API,以提供定时执行函数的能力。

定期执行一个函数,永不停止

// Forever calls f every period for ever.
//
// Forever is syntactic sugar on top of Until.
func Forever(f func(), period time.Duration)

该函数支持一个函数参数和一个间隔时间,该函数会定期执行,不会停止。

定期执行一个函数,可以接受停止信号

// Until loops until stop channel is closed, running f every period.
//
// Until is syntactic sugar on top of JitterUntil with zero jitter factor and
// with sliding = true (which means the timer for period starts after the f
// completes).
func Until(f func(), period time.Duration, stopCh <-chan struct{}) 

该函数支持提供一个函数、间隔时间和发生 stop 信号的 channel,和 Forever 类似,不过可以通过向 stopCh 发布消息来停止。

定期检查先决条件

// Poll tries a condition func until it returns true, an error, or the timeout
// is reached.
//
// Poll always waits the interval before the run of 'condition'.
// 'condition' will always be invoked at least once.
//
// Some intervals may be missed if the condition takes too long or the time
// window is too short.
//
// If you want to Poll something forever, see PollInfinite.
func Poll(interval, timeout time.Duration, condition ConditionFunc) error

该函数将以 interval 为间隔,定期检查 condition 是否检查成功。

定期检查先决条件,直到检查成功或停止

// PollUntil tries a condition func until it returns true, an error or stopCh is
// closed.
//
// PollUntil always waits interval before the first run of 'condition'.
// 'condition' will always be invoked at least once.
func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error

核心代码

wait 库的定时任务 API 是基于 JitterUntil 实现的。

// JitterUntil loops until stop channel is closed, running f every period.
//
// If jitterFactor is positive, the period is jittered before every run of f.
// If jitterFactor is not positive, the period is unchanged and not jittered.
//
// If sliding is true, the period is computed after f runs. If it is false then
// period includes the runtime for f.
//
// Close stopCh to stop. f may not be invoked if stop channel is already
// closed. Pass NeverStop to if you don't want it stop.
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
    var t *time.Timer
    var sawTimeout bool

    for {
        select {
        case <-stopCh:
            return
        default:
        }

        jitteredPeriod := period
        if jitterFactor > 0.0 {
            jitteredPeriod = Jitter(period, jitterFactor)
        }

        if !sliding {
            t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
        }

        func() {
            defer runtime.HandleCrash()
            f()
        }()

        if sliding {
            t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
        }

        // NOTE: b/c there is no priority selection in golang
        // it is possible for this to race, meaning we could
        // trigger t.C and stopCh, and t.C select falls through.
        // In order to mitigate we re-check stopCh at the beginning
        // of every loop to prevent extra executions of f().
        select {
        case <-stopCh:
            return
        case <-t.C:
            sawTimeout = true
        }
    }
}

JitterUntil 的 5 个参数:

参数名 类型 作用
f func() 需要定时执行的逻辑函数
period time.Duration 定时任务的时间间隔
jitterFactor float64 如果大于 0.0 间隔时间变为 duration 到 duration + maxFactor * duration 的随机值
sliding bool 逻辑的执行时间是否不算入间隔时间
stopCh <-chan struct{} 接受停止信号的 channel
2019/10/11 14:51 下午 posted in  Kubernetes

无需 Daemon 进程的容器工具:Podman

什么是容器

Linux 容器技术

Linux 容器是由 Linux Kernel 提供的具有特定隔离的进程。Linux 容器技术能够让用户对应用及其整个运行时环境(包括全部所需文件)一起进行打包或隔离。从而让用户在不同环境,之间轻松迁移应用的同时,并保留应用的全部功能。

Docker 的问题

一提到容器技术,肯定无法绕开 Docker,Docker 是一个著名的开源容器引擎,在容器技术已经在逐步普及的现在,Docker 几乎也成了容器的代名词。

Docker 本身也是 Linux 容器技术的一种封装,通过并向用户提供简易的接口,使用户非常方便的打包和使用容器。

作为目前主流的容器引擎,Docker 有着丰富的使用场景和解决方案,但也有一些问题。

  1. Docker 需要运行一个守护进程,所有容器都是守护进程的子进程
  2. Docker 需要 root 身份运行守护进程

看起来这仿佛没有什么问题,但是如果你尝试大规模使用 Docker 你会发现:

  1. 守护进程并没有想象中的稳定
  2. 一个容器的 OOM 很可能会拖累到父进程从而影响邻居容器
  3. Docker 进程树会有些奇奇怪怪的现象,你无法确定是 Docker 的 bug 还是 Kernel 做了什么
  4. Docker 容器是 root 启动的进程

如果换个方向,守护进程真的有必要吗?

什么是Podman

Podman 曾是 CRI-O project 中的一部分,后来被分离出成为一个独立的项目:libpod( https://github.com/containers/libpod )。Podman 的目标是提供一个和 Docker 相似的 Container CLI(甚至官方直接建议使用:alias docker=podman)。

安装

安装 Podman 非常简单,安装文档:https://github.com/containers/libpod/blob/master/install.md

MacOS

Using Homebrew:

brew cask install podman

Fedora, CentOS

sudo yum -y install podman

Ubuntu(development versions)

sudo apt-get update -qq
sudo apt-get install -qq -y software-properties-common uidmap
sudo add-apt-repository -y ppa:projectatomic/ppa
sudo apt-get update -qq
sudo apt-get -qq -y install podman

使用

下文实验基于 Podman V1.4.4 进行。

# podman version
Version:            1.4.4
RemoteAPI Version:  1
Go Version:         go1.10.3
OS/Arch:            linux/amd64

拉取镜像

Podman 会默认先拉取 registry.access.redhat.com 的镜像,因为众所周知的原因,国内是无法正常拉取的,但拉取失败之后 Podman 会再尝试 docker.io 的镜像:

# podman pull nginx
Trying to pull registry.access.redhat.com/nginx...ERRO[0001] Error pulling image ref //registry.access.redhat.com/nginx:latest: Error initializing source docker://registry.access.redhat.com/nginx:latest: Error reading manifest latest in registry.access.redhat.com/nginx: name unknown: Repo not found
Failed
Trying to pull docker.io/library/nginx...Getting image source signatures
Copying blob 7acba7289aa3 done
Copying blob b8f262c62ec6 done
Copying blob e9218e8f93b1 done
Copying config f949e7d76d done
Writing manifest to image destination
Storing signatures
f949e7d76d63befffc8eec2cbf8a6f509780f96fb3bacbdc24068d594a77f043

Podman 的数据路径在 /var/lib/containers 下,和 Docker 类似,保存了 layer 等数据。

[root@podman-test-vm lib]# tree /var/lib/containers/ -L 2
/var/lib/containers/
├── cache
│   └── blob-info-cache-v1.boltdb
└── storage
    ├── libpod
    ├── mounts
    ├── overlay
    ├── overlay-containers
    ├── overlay-images
    ├── overlay-layers
    ├── storage.lock
    └── tmp

可以看到刚才拉取的镜像信息:

[root@podman-test-vm lib]# cat /var/lib/containers/storage/overlay-images/images.json | python -m json.tool
[
    {
        "big-data-digests": {
            "manifest": "sha256:066edc156bcada86155fd80ae03667cf3811c499df73815a2b76e43755ebbc76",
            "manifest-sha256:066edc156bcada86155fd80ae03667cf3811c499df73815a2b76e43755ebbc76": "sha256:066edc156bcada86155fd80ae03667cf3811c499df73815a2b76e43755ebbc76",
            "sha256:f949e7d76d63befffc8eec2cbf8a6f509780f96fb3bacbdc24068d594a77f043": "sha256:f949e7d76d63befffc8eec2cbf8a6f509780f96fb3bacbdc24068d594a77f043"
        },
        "big-data-names": [
            "sha256:f949e7d76d63befffc8eec2cbf8a6f509780f96fb3bacbdc24068d594a77f043",
            "manifest-sha256:066edc156bcada86155fd80ae03667cf3811c499df73815a2b76e43755ebbc76",
            "manifest"
        ],
        "big-data-sizes": {
            "manifest": 948,
            "manifest-sha256:066edc156bcada86155fd80ae03667cf3811c499df73815a2b76e43755ebbc76": 948,
            "sha256:f949e7d76d63befffc8eec2cbf8a6f509780f96fb3bacbdc24068d594a77f043": 6669
        },
        "created": "2019-09-24T23:33:17.034191345Z",
        "digest": "sha256:066edc156bcada86155fd80ae03667cf3811c499df73815a2b76e43755ebbc76",
        "id": "f949e7d76d63befffc8eec2cbf8a6f509780f96fb3bacbdc24068d594a77f043",
        "layer": "ea345052c98934e4e4673b2d359b5000a9ff1cc7f0332df0d406980f172deea6",
        "metadata": "{}",
        "names": [
            "docker.io/library/nginx:latest"
        ]
    }
]

启动容器

Podman 绝大多数命令和 Docker 兼容,因此可以使用类似的方式启动容器:

[root@podman-test-vm ~]# podman run -p 80:80 --name=web -d nginx
9d284597eeedbbdfb4df933e063fe1035cbd39f1e712173f7a8a3652773eac02

[root@podman-test-vm ~]# podman ps
CONTAINER ID  IMAGE                           COMMAND               CREATED         STATUS             PORTS               NAMES
9d284597eeed  docker.io/library/nginx:latest  nginx -g daemon o...  48 seconds ago  Up 48 seconds ago  0.0.0.0:80->80/tcp  web

尝试检查 nginx 的进程:

[root@podman-test-vm ~]# ps -ef | grep nginx
root      2518  2508  0 12:01 ?        00:00:00 nginx: master process nginx -g daemon off;
101       2529  2518  0 12:01 ?        00:00:00 nginx: worker process
root      2637  1259  0 12:10 pts/0    00:00:00 grep --color=auto nginx

然后根据 pid 查看进程树:

[root@podman-test-vm ~]# pstree -H 2518
systemd─┬─NetworkManager─┬─2*[dhclient]
        │                └─2*[{NetworkManager}]
        ├─anacron
        ├─auditd───{auditd}
        ├─conmon─┬─nginx───nginx
        │        └─{conmon}
        ├─crond
        ├─dbus-daemon───{dbus-daemon}
        ├─firewalld───{firewalld}
        ├─login───bash
        ├─lvmetad
        ├─master─┬─pickup
        │        └─qmgr
        ├─polkitd───5*[{polkitd}]
        ├─rsyslogd───2*[{rsyslogd}]
        ├─sshd───sshd───bash───pstree
        ├─systemd-journal
        ├─systemd-logind
        ├─systemd-udevd
        └─tuned───4*[{tuned}]

根据 ppid 查看父进程:

[root@podman-test-vm ~]# ps -ef | grep 2508
root      2508     1  0 12:01 ?        00:00:00 /usr/libexec/podman/conmon -s -c 9d284597eeedbbdfb4df933e063fe1035cbd39f1e712173f7a8a3652773eac02 -u 9d284597eeedbbdfb4df933e063fe1035cbd39f1e712173f7a8a3652773eac02 -n web -r /usr/bin/runc -b /var/lib/containers/storage/overlay-containers/9d284597eeedbbdfb4df933e063fe1035cbd39f1e712173f7a8a3652773eac02/userdata -p /var/run/containers/storage/overlay-containers/9d284597eeedbbdfb4df933e063fe1035cbd39f1e712173f7a8a3652773eac02/userdata/pidfile --exit-dir /var/run/libpod/exits --exit-command /usr/bin/podman --exit-command-arg --root --exit-command-arg /var/lib/containers/storage --exit-command-arg --runroot --exit-command-arg /var/run/containers/storage --exit-command-arg --log-level --exit-command-arg error --exit-command-arg --cgroup-manager --exit-command-arg systemd --exit-command-arg --tmpdir --exit-command-arg /var/run/libpod --exit-command-arg --runtime --exit-command-arg runc --exit-command-arg --storage-driver --exit-command-arg overlay --exit-command-arg container --exit-command-arg cleanup --exit-command-arg 9d284597eeedbbdfb4df933e063fe1035cbd39f1e712173f7a8a3652773eac02 --socket-dir-path /var/run/libpod/socket -l k8s-file:/var/lib/containers/storage/overlay-containers/9d284597eeedbbdfb4df933e063fe1035cbd39f1e712173f7a8a3652773eac02/userdata/ctr.log --log-level error
root      2518  2508  0 12:01 ?        00:00:00 nginx: master process nginx -g daemon off;
root      2639  1259  0 12:10 pts/0    00:00:00 grep --color=auto 2508

可以看到,podman 通过 podman/conmon 启动了容器,而且这个进程被挂在到 pid 1 也就是 systemd 下面。

podman/conmon 是 Podman 的启动器,主要负责两个功能,一是监控 runc,利用 runc 的能力管理容器,而是和 Podman 建立通讯,并传递对容器操作的指令。

Podman does not communicate with using the CRI protocol. Instead, Podman creates containers using runc, and manages storage using containers/storage. Technically, Podman launches conmon which launches and monitors the OCI Runtime (runc). Podman can exit and later reconnect to conmon to talk to the container. Runc stops running once the container starts.

《Crictl Vs Podman》:https://blog.openshift.com/crictl-vs-podman/

构建镜像

Podman 可以直接使用 Dockerfile 进行构建:

[root@podman-test-vm ~]# git clone https://github.com/DaoCloud/dao-2048.git
正克隆到 'dao-2048'...
remote: Enumerating objects: 116, done.
remote: Total 116 (delta 0), reused 0 (delta 0), pack-reused 116
接收对象中: 100% (116/116), 304.76 KiB | 60.00 KiB/s, done.
处理 delta 中: 100% (40/40), done.
[root@podman-test-vm ~]# podman build dao-2048/
STEP 1: FROM daocloud.io/nginx:1.11-alpine
Getting image source signatures
Copying blob ed383a1b82df done
Copying blob c92260fe6357 done
Copying blob 4b21d71b440a done
Copying blob 709515475419 done
Copying config bedece1f06 done
Writing manifest to image destination
Storing signatures
STEP 2: MAINTAINER Golfen Guo <golfen.guo@daocloud.io>
711d32f788782528ad36a0c12ae895993474b168f7f2d65158e531a924b3dd55
STEP 3: COPY . /usr/share/nginx/html
b859763deeb7eaab39fd8c34a8c8af18e8de74c80e98f9fcb1e0c694881c5e9c
STEP 4: EXPOSE 80
eedb1a66c8316a309546b21a5c687f3e3611927b54a7fa0f4dbd3f175eeb253c
STEP 5: CMD sed -i "s/ContainerID: /ContainerID: "$(hostname)"/g" /usr/share/nginx/html/index.html && nginx -g "daemon off;"
STEP 6: COMMIT
fda5a2d14a918c3eb088c28dc0d8e89e66b061923a82e8722d9e2a62c994422d

[root@podman-test-vm ~]# podman images
REPOSITORY                TAG           IMAGE ID       CREATED          SIZE
<none>                    <none>        fda5a2d14a91   22 seconds ago   56.9 MB
docker.io/library/nginx   latest        f949e7d76d63   4 days ago       130 MB
daocloud.io/nginx         1.11-alpine   bedece1f06cc   2 years ago      55.9 MB

Dockerfile:

[root@podman-test-vm ~]# cat dao-2048/Dockerfile
# Using a compact OS
FROM daocloud.io/nginx:1.11-alpine

MAINTAINER Golfen Guo <golfen.guo@daocloud.io>

# Add 2048 stuff into Nginx server
COPY . /usr/share/nginx/html

EXPOSE 80

# Start Nginx and keep it running background and start php
CMD sed -i "s/ContainerID: /ContainerID: "$(hostname)"/g" /usr/share/nginx/html/index.html && nginx -g "daemon off;"

问题

Podman 致力于去掉守护进程,这也就意味着需要守护进程完成的任务 Podman 无法做到。

Restart 问题

在 Docker 中,可以通过 --restart 命令指定重启策略,当 node 重启,只要 dockerd 还能起来,有重启策略的容器就会自恢复。

因为 Podman 是将容器的管理托付给了 systemd,因此官方给的建议也是通过 systemd 来解决( https://podman.io/blogs/2018/09/13/systemd.html ),可以为需要自启动的容器编写 systemd service 文件,来描述启动方式了重启策略。

$ vim /etc/systemd/system/nginx_container.service 
 
[Unit] 
Description=Podman Nginx Service 
After=network.target 
After=network-online.target 
 
[Service] 
Type=simple 
ExecStart=/usr/bin/podman start -a nginx 
ExecStop=/usr/bin/podman stop -t 10 nginx 
Restart=always 
 
[Install] 
WantedBy=multi-user.target 

虽然有些麻烦(而且感觉有些逆潮流),不过仔细想一想,这不是就是 Linux 比较推荐的服务管理方式么。其次,Docker 虽然支持容器自启,但并不支持按照依赖关系依次启动,但是利用 systemd 的能力,可以通过 After 制定启动依赖,反而可以更好的管理启动顺序。

2019/09/29 11:13 上午 posted in  Docker