Kubernetes Controller Manager 工作原理

2019/12/15 14:23 下午 posted in  Kubernetes

本文基于对 Kubernetes v1.16 的源码阅读,文章有一定的源码,但我会通过配图尽量描述清晰

在 Kubernetes Master 节点中,有三个重要组件:ApiServer、ControllerManager、Scheduler,它们一起承担了整个集群的管理工作。本文尝试梳理清楚 ControllerManager 的工作流程和原理。

什么是 Controller Manager

根据官方文档的说法:kube-controller-manager 运行控制器,它们是处理集群中常规任务的后台线程。

说白了,Controller Manager 就是集群内部的管理控制中心,由负责不同资源的多个 Controller 构成,共同负责集群内的 Node、Pod 等所有资源的管理,比如当通过 Deployment 创建的某个 Pod 发生异常退出时,RS Controller 便会接受并处理该退出事件,并创建新的 Pod 来维持预期副本数。

几乎每种特定资源都有特定的 Controller 维护管理以保持预期状态,而 Controller Manager 的职责便是把所有的 Controller 聚合起来:

  1. 提供基础设施降低 Controller 的实现复杂度
  2. 启动和维持 Controller 的正常运行

可以这么说,Controller 保证集群内的资源保持预期状态,而 Controller Manager 保证了 Controller 保持在预期状态。

Controller 工作流程

在讲解 Controller Manager 怎么为 Controller 提供基础设施和运行环境之前,我们先了解一下 Controller 的工作流程是什么样子的。

从比较高维度的视角看,Controller Manager 主要提供了一个分发事件的能力,而不同的 Controller 只需要注册对应的 Handler 来等待接收和处理事件。

以 Deployment Controller 举例,在 pkg/controller/deployment/deployment_controller.goNewDeploymentController 方法中,便包括了 Event Handler 的注册,对于 Deployment Controller 来说,只需要根据不同的事件实现不同的处理逻辑,便可以实现对相应资源的管理。

dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc:    dc.addDeployment,
    UpdateFunc: dc.updateDeployment,
    // This will enter the sync loop and no-op, because the deployment has been deleted from the store.
    DeleteFunc: dc.deleteDeployment,
})
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc:    dc.addReplicaSet,
    UpdateFunc: dc.updateReplicaSet,
    DeleteFunc: dc.deleteReplicaSet,
})
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    DeleteFunc: dc.deletePod,
})

可以看到,在 Controller Manager 的帮助下,Controller 的逻辑可以做的非常纯粹,只需要实现相应的 EventHandler 即可,那么 Controller Manager 都做了哪些具体的工作呢?

Controller Manager 架构

辅助 Controller Manager 完成事件分发的是 client-go,而其中比较关键的模块便是 informer。

kubernetes 在 github 上提供了一张 client-go 的架构图,从中可以看出,Controller 正是下半部分(CustomController)描述的内容,而 Controller Manager 主要完成的是上半部分。

Informer 工厂

从上图可以看到 Informer 是一个非常关键的 “桥梁” 作用,因此对 Informer 的管理便是 Controller Manager 要做的第一件事。

在 Controller Manager 启动时,便会创建一个名为 SharedInformerFactory 的单例工厂,因为每个 Informer 都会与 Api Server 维持一个 watch 长连接,所以这个单例工厂通过为所有 Controller 提供了唯一获取 Informer 的入口,来保证每种类型的 Informer 只被实例化一次。

该单例工厂的初始化逻辑:

// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
    factory := &sharedInformerFactory{
        client:           client,
        namespace:        v1.NamespaceAll,
        defaultResync:    defaultResync,
        informers:        make(map[reflect.Type]cache.SharedIndexInformer),
        startedInformers: make(map[reflect.Type]bool),
        customResync:     make(map[reflect.Type]time.Duration),
    }

    // Apply all options
    for _, opt := range options {
        factory = opt(factory)
    }

    return factory
}

从上面的初始化逻辑中可以看到,sharedInformerFactory 中最重要的是名为 informers 的 map,其中 key 为资源类型,而 value 便是关注该资源类型的 Informer。每种类型的 Informer 只会被实例化一次,并存储在 map 中,不同 Controller 需要相同资源的 Informer 时只会拿到同一个 Informer 实例。

对于 Controller Manager 来说,维护所有的 Informer 使其正常工作,是保证所有 Controller 正常工作的基础条件。sharedInformerFactory 通过该 map 维护了所有的 informer 实例,因此,sharedInformerFactory 也承担了提供统一启动入口的职责:

// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    f.lock.Lock()
    defer f.lock.Unlock()

    for informerType, informer := range f.informers {
        if !f.startedInformers[informerType] {
            go informer.Run(stopCh)
            f.startedInformers[informerType] = true
        }
    }
}

当 Controller Manager 启动时,最重要的就是通过该工厂的 Start 方法,将所有的 Informer 运行起来。

Informer 的创建

下面看下这些 Informer 是怎么被创建的。Controller Manager 在 cmd/kube-controller-manager/app/controllermanager.goNewControllerInitializers 函数中初识化了所有的 Controller,因为代码冗长,这里仅拿 Deployment Controller 举例子。

初始化 Deployment Controller 的逻辑在 cmd/kube-controller-manager/app/apps.gostartDeploymentController 的函数中:

func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
    if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
        return nil, false, nil
    }
    dc, err := deployment.NewDeploymentController(
        ctx.InformerFactory.Apps().V1().Deployments(),
        ctx.InformerFactory.Apps().V1().ReplicaSets(),
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.ClientBuilder.ClientOrDie("deployment-controller"),
    )
    if err != nil {
        return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
    }
    go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
    return nil, true, nil
}

最关键的逻辑在 deployment.NewDeploymentController 上,该函数真正创建了 Deployment Controller,而该创建函数的前三个参数分别为 Deployment、ReplicaSet、Pod 的 Informer。可以看到,Informer 的单例工厂以 ApiGroup 为路径提供了不同资源的 Informer 创建入口。

不过要注意的是,.Apps().V1().Deployments() 虽然返回的是 deploymentInformer 类型的实例,但是,deploymentInformer 其实并不是一个真正的 Informer(尽管他以 Informer 命名),它只是一个模板类,主要功能是提供关注 Deployment 这一特定资源 Informer 的创建模板:

// Deployments returns a DeploymentInformer.
func (v *version) Deployments() DeploymentInformer {
    return &deploymentInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}

真正创建 Informer 的逻辑是在 deploymentInformer.Informer() 中(client-go/informers/apps/v1/deployment.go),f.defaultInformer 是默认的 Deployment Informer 创建模板方法,通过将资源实例和该模板方法传入 Informer 工厂的 InformerFor 方法,来创建仅关注 Deployment 资源的 Informer:

func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
    return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}

简单梳理一下:

  1. 可以通过 Informer 工厂获得特定类型的 Informer 模板类(即这里的 deploymentInformer
  2. 真正创建该特定资源 Informer 的是 Informer 模板类的 Informer() 方法
  3. Informer() 方法只不过是通过 Informer 工厂的 InformerFor 来创建真正的 Informer

这里用到了模板方法(设计模式),虽然有一点绕口,但可以参考下图梳理一下,理解关键在于 Informer 的 差异化的创建逻辑下放给了模板类

最后,名为 sharedIndexInformer 的结构体将被实例化,并真正的承担 Informer 的职责。被注册到 Informer 工厂 map 中的也是该实例。

Informer 的运行

因为真正的 Informer 实例是一个 sharedIndexInformer 类型的对象,当 Informer 工厂启动时(执行 Start 方法),被真正运行起来的是 sharedIndexInformer

sharedIndexInformer 是 client-go 里的组件,它的 Run 方法虽然短短几十行,但是却承担了很多工作。到这里,才到了 Controller Manager 最有趣的部分。

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()

    fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

    cfg := &Config{
        Queue:            fifo,
        ListerWatcher:    s.listerWatcher,
        ObjectType:       s.objectType,
        FullResyncPeriod: s.resyncCheckPeriod,
        RetryOnError:     false,
        ShouldResync:     s.processor.shouldResync,

        Process: s.HandleDeltas,
    }

    func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()

        s.controller = New(cfg)
        s.controller.(*controller).clock = s.clock
        s.started = true
    }()

    // Separate stop channel because Processor should be stopped strictly after controller
    processorStopCh := make(chan struct{})
    var wg wait.Group
    defer wg.Wait()              // Wait for Processor to stop
    defer close(processorStopCh) // Tell Processor to stop
    wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
    wg.StartWithChannel(processorStopCh, s.processor.run)

    defer func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()
        s.stopped = true // Don't want any new listeners
    }()
    s.controller.Run(stopCh)
}

sharedIndexInformer 的启动逻辑主要做了下面几件事:

  1. 创建了名为 fifo 的队列
  2. 创建并运行了一个名为 controller 的实例
  3. 启动了 cacheMutationDetector
  4. 启动了 processor

这几个名词(或者说组件)前文并没有提到过,而这四件事情是 Controller Manager 工作的核心内容,因此下面我会分别介绍。

sharedIndexInformer

sharedIndexInformer 是一个共享的 Informer 框架,不同的 Controller 只需要提供一个模板类(比如上文提到的 deploymentInformer ),便可以创建一个符合自己需求的特定 Informer。

sharedIndexInformer 包含了一堆工具来完成 Informer 的任务,其主要代码在 client-go/tools/cache/shared_informer.go 中。其创建逻辑也在其中:

// NewSharedIndexInformer creates a new instance for the listwatcher.
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
    realClock := &clock.RealClock{}
    sharedIndexInformer := &sharedIndexInformer{
        processor:                       &sharedProcessor{clock: realClock},
        indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
        listerWatcher:                   lw,
        objectType:                      objType,
        resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
        defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
        cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
        clock:                           realClock,
    }
    return sharedIndexInformer
}

在创建逻辑中,有几个东西需要留意:

  1. processor:提供了 EventHandler 注册和事件分发的功能
  2. indexer:提供了资源缓存的功能
  3. listerWatcher:由模板类提供,包含特定资源的 List 和 Watch 方法
  4. objectType:用来标记关注哪种特定资源类型
  5. cacheMutationDetector:监控 Informer 的缓存

除此之外,还包含了上文启动逻辑中提到了 DeltaFIFO 队列和 controller,下面就分别介绍。

sharedProcessor

processor 是 sharedIndexInformer 中一个非常有趣的组件,Controller Manager 通过一个 Informer 单例工厂来保证不同的 Controller 共享了同一个 Informer,但是不同的 Controller 对该共享的 Informer 注册的 Handler 不同,那么 Informer 应该怎么管理被注册的 Handler 呢?

processor 便是用来管理被注册的 Handler 以及将事件分发给不同 Handler 的组件。

type sharedProcessor struct {
    listenersStarted bool
    listenersLock    sync.RWMutex
    listeners        []*processorListener
    syncingListeners []*processorListener
    clock            clock.Clock
    wg               wait.Group
}

sharedProcessor 的工作核心是围绕着 listeners 这个 Listener 切片展开的。

当我们注册一个 Handler 到 Informer 时,最终会被转换为一个名为 processorListener 结构体的实例:

func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
    ret := &processorListener{
        nextCh:                make(chan interface{}),
        addCh:                 make(chan interface{}),
        handler:               handler,
        pendingNotifications:  *buffer.NewRingGrowing(bufferSize),
        requestedResyncPeriod: requestedResyncPeriod,
        resyncPeriod:          resyncPeriod,
    }

    ret.determineNextResync(now)

    return ret
}

该实例主要包含两个 channel 和外面注册的 Handler 方法。而此处被实例化的 processorListener 对象最终会被添加到 sharedProcessor.listeners 列表中:

func (p *sharedProcessor) addListener(listener *processorListener) {
    p.listenersLock.Lock()
    defer p.listenersLock.Unlock()

    p.addListenerLocked(listener)
    if p.listenersStarted {
        p.wg.Start(listener.run)
        p.wg.Start(listener.pop)
    }
}

如图所示,Controller 中的 Handler 方法最终会被添加到 Listener 中,而 Listener 将会被 append 到 sharedProcessor 的 Listeners 切片中。

前文提到,sharedIndexInformer 启动时会将 sharedProcessor 运行起来,而 sharedProcessor 的启动逻辑便是和这些 listener 有关:

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
    func() {
        p.listenersLock.RLock()
        defer p.listenersLock.RUnlock()
        for _, listener := range p.listeners {
            p.wg.Start(listener.run)
            p.wg.Start(listener.pop)
        }
        p.listenersStarted = true
    }()
    <-stopCh
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()
    for _, listener := range p.listeners {
        close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
    }
    p.wg.Wait() // Wait for all .pop() and .run() to stop
}

可以看到,sharedProcessor 启动时会依次执行 listenerrunpop 方法,我们现在看下这两个方法。

listener 的启动

因为 listener 包含了 Controller 注册进来的 Handler 方法,因此 listener 最重要的职能就是当事件发生时来触发这些方法,而 listener.run 就是不停的从 nextCh 这个 channel 中拿到事件并执行对应的 handler:

func (p *processorListener) run() {
    // this call blocks until the channel is closed.  When a panic happens during the notification
    // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
    // the next notification will be attempted.  This is usually better than the alternative of never
    // delivering again.
    stopCh := make(chan struct{})
    wait.Until(func() {
        // this gives us a few quick retries before a long pause and then a few more quick retries
        err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
            for next := range p.nextCh {
                switch notification := next.(type) {
                case updateNotification:
                    p.handler.OnUpdate(notification.oldObj, notification.newObj)
                case addNotification:
                    p.handler.OnAdd(notification.newObj)
                case deleteNotification:
                    p.handler.OnDelete(notification.oldObj)
                default:
                    utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
                }
            }
            // the only way to get here is if the p.nextCh is empty and closed
            return true, nil
        })

        // the only way to get here is if the p.nextCh is empty and closed
        if err == nil {
            close(stopCh)
        }
    }, 1*time.Minute, stopCh)
}

可以看到,listener.run 不停的从 nextCh 这个 channel 中拿到事件,但是 nextCh 这个 channel 里的事件又是从哪来的呢?listener.pop 的职责便是将事件放入 nextCh 中。

listener.pop 是一段非常精巧和有趣的逻辑:

func (p *processorListener) pop() {
    defer utilruntime.HandleCrash()
    defer close(p.nextCh) // Tell .run() to stop

    var nextCh chan<- interface{}
    var notification interface{}
    for {
        select {
        case nextCh <- notification:
            // Notification dispatched
            var ok bool
            notification, ok = p.pendingNotifications.ReadOne()
            if !ok { // Nothing to pop
                nextCh = nil // Disable this select case
            }
        case notificationToAdd, ok := <-p.addCh:
            if !ok {
                return
            }
            if notification == nil { // No notification to pop (and pendingNotifications is empty)
                // Optimize the case - skip adding to pendingNotifications
                notification = notificationToAdd
                nextCh = p.nextCh
            } else { // There is already a notification waiting to be dispatched
                p.pendingNotifications.WriteOne(notificationToAdd)
            }
        }
    }
}

listener 之所以包含了两个 channel:addChnextCh,是因为 Informer 无法预知 listener.handler 的事件消费的速度是否大于事件生产的速度,因此添加了一个名为 pendingNotifications 的缓冲队列来保存未来得及消费的事件。

pop 方法一方面会不停的从 addCh 中获得最新事件,以保证不会让生产方阻塞。然后判断是否存在 buffer,如果存在则把事件添加到 buffer 中,如果不存在则尝试推给 nextCh

而另一方面,会判断 buffer 中是否还有事件,如果还有存量,则不停的传递给 nextCh

pop 方法实现了一个带 buffer 的分发机制,使得事件可以源源不断的从 addChnextCh。但是问题来了,那 addCh 的事件从哪来呢。

其实来源非常简单,listener 有一个 add 方法,入参是一个事件,该方法会将新事件推入 addCh 中。而调用该 add 方法的是管理所有 listenersharedProcessor

上面提到过,sharedProcessor 的职责便是管理所有的 Handler 以及分发事件,而真正做分发工作的是 distribute 方法:

func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()

    if sync {
        for _, listener := range p.syncingListeners {
            listener.add(obj)
        }
    } else {
        for _, listener := range p.listeners {
            listener.add(obj)
        }
    }
}

到目前为止,我们有一部分比较清晰了:

  1. Controller 将 Handler 注册给 Informer
  2. Informer 通过 sharedProcessor 维护了所有的 Handler(listener)
  3. Informer 收到事件时,通过 sharedProcessor.distribute 将事件分发下去
  4. Controller 被触发对应的 Handler 来处理自己的逻辑

那么剩下的问题就是 Informer 的事件从哪来呢?

DeltaFIFO

在分析 Informer 获取事件之前,需要提前讲一个非常有趣的小工具,就是在 sharedIndexInformer.Run 的时候创建的 fifo 队列:

fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

DeltaFIFO 是一个非常有趣的队列,相关代码定义在 client-go/tools/cache/delta_fifo.go 中。对于一个队列来说,最重要的肯定是 Add 方法和 Pop 方法,DeltaFIFO 提供了多个 Add 方法,虽然根据不同的事件类型(add/update/delete/sync)区分不同的方法,但是最终都会执行 queueActionLocked

// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }

    // If object is supposed to be deleted (last event is Deleted),
    // then we should ignore Sync events, because it would result in
    // recreation of this object.
    if actionType == Sync && f.willObjectBeDeletedLocked(id) {
        return nil
    }

    newDeltas := append(f.items[id], Delta{actionType, obj})
    newDeltas = dedupDeltas(newDeltas)

    if len(newDeltas) > 0 {
        if _, exists := f.items[id]; !exists {
            f.queue = append(f.queue, id)
        }
        f.items[id] = newDeltas
        f.cond.Broadcast()
    } else {
        // We need to remove this from our map (extra items in the queue are
        // ignored if they are not in the map).
        delete(f.items, id)
    }
    return nil
}

queueActionLocked 方法的第一个参数 actionType 便是事件类型:

const (
    Added   DeltaType = "Added"   // watch api 获得的创建事件
    Updated DeltaType = "Updated" // watch api 获得的更新事件
    Deleted DeltaType = "Deleted" // watch api 获得的删除事件
    Sync DeltaType = "Sync"       // 触发了 List Api,需要刷新缓存
)

从事件类型以及入队列方法可以看出,这是一个带有业务功能的队列,并不是单纯的“先入先出”,入队列方法中有两个非常精巧的设计:

  1. 入队列的事件会先判断该资源是否存在未被消费的事件,然后适当处理
  2. 如果 list 方法时发现该资源已经被删除了,则不再处理

第二点比较好理解,如果触发了 List 请求,而且发现要被处理的资源已经被删除了,则就不需要再入队列处理。而第一点需要结合出队列方法一起来看:

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
    f.lock.Lock()
    defer f.lock.Unlock()
    for {
        for len(f.queue) == 0 {
            // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
            // When Close() is called, the f.closed is set and the condition is broadcasted.
            // Which causes this loop to continue and return from the Pop().
            if f.IsClosed() {
                return nil, ErrFIFOClosed
            }

            f.cond.Wait()
        }
        id := f.queue[0]
        f.queue = f.queue[1:]
        if f.initialPopulationCount > 0 {
            f.initialPopulationCount--
        }
        item, ok := f.items[id]
        if !ok {
            // Item may have been deleted subsequently.
            continue
        }
        delete(f.items, id)
        err := process(item)
        if e, ok := err.(ErrRequeue); ok {
            f.addIfNotPresent(id, item)
            err = e.Err
        }
        // Don't need to copyDeltas here, because we're transferring
        // ownership to the caller.
        return item, err
    }
}

DeltaFIFO 的 Pop 方法有一个入参,即是处理函数,出队列时,DeltaFIFO 会先根据资源 id 获得该资源 所有的事件,然后交给处理函数。

工作流程如图所示:

总体来看,DeltaFIFO 的入队列方法,会先判断该资源是否已经在 items 中, 如果已经存在,说明该资源还没有被消费(还在 queue 中排队),则直接将事件 append 到 items[resource_id] 中即可。如果发现不在 items 中,便会创建 items[resource_id],并将资源 id append 到 queue 中。

而 DeltaFIFO 出队列方法,会从 queue 中拿到队列最前面的资源 id,然后从 items 中拿走该资源所有的事件,最后调用 Pop 方法传入的 PopProcessFunc 类型的处理函数。

因此,DeltaFIFO 的特点在于,入队列的是(资源的)事件,而出队列时是拿到的是最早入队列的资源的所有事件。这样的设计保证了不会因为有某个资源疯狂的制造事件,导致其他资源没有机会被处理而产生饥饿。

controller

DeltaFIFO 是一个非常重要的组件,真正让他发挥价值的,便是 Informer 的 controller

虽然 K8s 源码中的确用的是 controller 这个词,但是此 controller 并不是 Deployment Controller 这种资源控制器。而是一个承上启下的事件控制器(从 API Server 拿到事件,下发给 Informer 进行处理)。

controller 的职责就两个:

  1. 通过 List-Watch 从 Api Server 获得事件、并将该事件推入 DeltaFIFO 中
  2. sharedIndexInformerHandleDeltas 方法作为参数,来调用 DeltaFIFO 的 Pop 方法

controller 的定义非常简单,它的核心就是 Reflector

type controller struct {
    config         Config
    reflector      *Reflector
    reflectorMutex sync.RWMutex
    clock          clock.Clock
}

Reflector 的代码比较繁琐但是功能比较简单,就是通过 sharedIndexInformer 里定义的 listerWatcher 进行 List-Watch,并将获得的事件推入 DeltaFIFO 中。

controller 启动之后会先将 Reflector 启动,然后在执行 processLoop,通过一个死循环,不停的将从 DeltaFIFO 读出需要处理的资源事件,并交给 sharedIndexInformerHandleDeltas 方法(创建 controller 时赋值给了 config.Process)。

func (c *controller) processLoop() {
    for {
        obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
        if err != nil {
            if err == ErrFIFOClosed {
                return
            }
            if c.config.RetryOnError {
                // This is the safe way to re-enqueue.
                c.config.Queue.AddIfNotPresent(obj)
            }
        }
    }
}

如果我们再查看下 sharedIndexInformer 的 HandleDeltas 方法,就会发现整个事件消费流程被打通了:

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()

    // from oldest to newest
    for _, d := range obj.(Deltas) {
        switch d.Type {
        case Sync, Added, Updated:
            isSync := d.Type == Sync
            s.cacheMutationDetector.AddObject(d.Object)
            if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                if err := s.indexer.Update(d.Object); err != nil {
                    return err
                }
                s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
            } else {
                if err := s.indexer.Add(d.Object); err != nil {
                    return err
                }
                s.processor.distribute(addNotification{newObj: d.Object}, isSync)
            }
        case Deleted:
            if err := s.indexer.Delete(d.Object); err != nil {
                return err
            }
            s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
        }
    }
    return nil
}

前面我们知道了 processor.distribute 方法可以将事件分发给所有 listener,而 controller 会使用 Reflector 从 ApiServer 拿到事件,并入队列,然后通过 processLoop 从队列中拿出要处理的资源的所有事件,最后通过 sharedIndexInformerHandleDeltas 方法,调用了 processor.distribute

因此,我们可以将整个事件流向整理为下图:

Indexer

以上,我们将事件从接收到分发,中间所有的逻辑已经梳理了一遍,但是在 sharedIndexInformer 的 HandleDeltas 方法中,还有一些逻辑比较令人注意,就是所有的事件都会先对 s.indexer 进行更新,然后在分发。

前面提到 Indexer 是一个线程安全的存储,作为缓存使用,为了减轻资源控制器(Controller)查询资源时对 ApiServer 的压力。

当有任何事件更新时,会先刷新 Indexer 里的缓存,然后再将事件分发给资源控制器,资源控制器在需要获得资源详情的时候,优先从 Indexer 获得,就可以减少对 APIServer 不必要的查询请求。

Indexer 存储的具体实现在 client-go/tools/cache/thread_safe_store.go 中,数据存储在 threadSafeMap 中:

type threadSafeMap struct {
    lock  sync.RWMutex
    items map[string]interface{}

    // indexers maps a name to an IndexFunc
    indexers Indexers
    // indices maps a name to an Index
    indices Indices
}

从本质上讲,threadSafeMap 就是加了一个读写锁的 map。除此之外,还可以定义索引,索引的实现非常有趣,通过两个字段完成:

  1. Indexers 是一个 map,定义了若干求索引函数,key 为 indexName,value 为求索引的函数(计算资源的索引值)。
  2. Indices 则保存了索引值和数据 key 的映射关系,Indices 是一个两层的 map,第一层的 key 为 indexName,和 Indexers 对应,确定使用什么方法计算索引值,value 是一个 map,保存了 “索引值-资源key” 的关联关系。

相关逻辑比较简单,可以参考下图:

MutationDetector

sharedIndexInformerHandleDeltas 方法中,除了向 s.indexer 更新的数据之外,还向 s.cacheMutationDetector 更新了数据。

在一开始讲到 sharedIndexInformer 启动时还会启动一个 cacheMutationDetector,来监控 indexer 的缓存。

因为 indexer 缓存的其实是一个指针,多个 Controller 访问 indexer 缓存的资源,其实获得的是同一个资源实例。如果有一个 Controller 并不本分,修改了资源的属性,势必会影响到其他 Controller 的正确性。

MutationDetector 的作用正是定期检查有没有缓存被修改,当 Informer 接收到新事件时,MutationDetector 会保存该资源的指针(和 indexer 一样),以及该资源的深拷贝。通过定期检查指针指向的资源和开始存储的深拷贝是否一致,便知道被缓存的资源是否被修改。

不过,具体是否启用监控是受到环境变量 KUBE_CACHE_MUTATION_DETECTOR 影响的,如果不设置该环境变量,sharedIndexInformer 实例化的是 dummyMutationDetector,在启动后什么事情也不做。

如果 KUBE_CACHE_MUTATION_DETECTOR 为 true,则 sharedIndexInformer 实例化的是 defaultCacheMutationDetector,该实例会以 1s 为间隔,定期执行检查缓存,如果发现缓存被修改,则会触发一个失败处理函数,如果该函数没被定义,则会触发一个 panic。

总结

本文讲解的应该算是狭义的 Controller Manager,毕竟没有包含具体的资源管理器(Controller),而只是讲解 Controller Manager 是怎么 “Manage Controller” 的。

可以看到 Controller Manager 做了很多工作来保证 Controller 可以只专注于处理自己关心的事件,而这些工作的核心就是 Informer,当理解了 Informer 是如何与其他组件协同工作,那么 Controller Manager 为资源管理器铺垫了什么也就了然了。