微服务学习 - k8s controller的informer,watch机制

概述

本文记录了对Kubernetes controller机制的学习、理解过程。

一、启动

1.main()

kubernetes/cmd/kube-controller-manager/controller-manager.go - main()

1.通过app.NewControllerManagerCommand()生成controller的command
2.通过command.Execute()执行controller command的Run函数

2.Run()

kubernetes/cmd/kube-controller-manager/app/controllermanager.go - Run()

1.建立健康检查组件(默认周期20s)
2.启动controller manager的http服务,根据c.SecureServing和c.InsecureServing来决定是否启动https
3.使用CreateControllerContext()函数生成用于资源索引的上下文结构体,包含cloud provider、clientBuilder之类的
4.使用StartControllers()函数,启动由NewControllerInitializers()生成的controller集合
5.启动相关的Informer(controllerContext.InformerFactory.Start()、controllerContext.ObjectOrMetadataInformerFactory.Start())

3.StartControllers()

kubernetes/cmd/kube-controller-manager/app/controllermanager.go - StartControllers()

1
2
3
for controllerName, initFn := range controllers {
debugHandler, started, err := initFn(ctx)
}

各个资源的初始化函数映射关系可在NewControllerInitializers中查看,以deployment资源为例,initFn为startDeploymentController()(kubernetes/cmd/kube-controller-manager/app/apps.go)

1
2
3
4
5
6
7
8
9
10
11
12
func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
// 给Deployments、ReplcaSets、Pods的Informer添加事件处理器
dc, err := deployment.NewDeploymentController(
ctx.InformerFactory.Apps().V1().Deployments(),
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("deployment-controller"),
)
// 运行deployment controller
go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
return nil, true, nil
}

二、Informer、Watch机制

1.InformerFactory

CreateControllerContext()中,指定了ctx结构体的InformerFactory和ObjectOrMetadataInformerFactory

1
2
3
4
5
6
ctx := ControllerContext{
// client-go/informers/factory.go - sharedInformerFactory结构体的实现
InformerFactory: sharedInformers,
// kubernetes/pkg/controller/informer_factory.go - informerFactory结构体的实现
ObjectOrMetadataInformerFactory: controller.NewInformerFactory(sharedInformers, metadataInformers),
}

在Run()的流程中,会启动相关的Informer:controllerContext.InformerFactory.Start() -> sharedInformerFactory.Start()(client-go/informers/factory.go)

1
2
3
4
5
6
7
8
9
10
11
12
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()

for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
// 调用sharedIndexInformer.Run()函数
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}
}
}

2.sharedIndexInformer.Run()

client-go/tools/cache/shared_informer.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// 生成一个名为DeltaFIFO的队列,即为informer的本地缓存
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})

// 准备controller的配置信息,controller的作用是通过s.listerWatcher获取变更的对象和通知
// 再将它们放入fifo中,同时还要使用s.HandleDeltas()处理变化数据
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,

Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
}

func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// 初始化controller
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()

// 启动controller
s.controller.Run(stopCh)
}

DeltaFIFO的结构体代码位于client-go/tools/cache/delta_fifo.go - DeltaFIFO

DeltaFIFO是一个生产者与消费者的队列,其中Relector是生产者,消费者调用DeltaFIFO.Pop()方法

3.Reflector.Run()

client-go/tools/cache/reflector.go

在步骤2中controller的Run()中会初始化Relfector实例:

1
2
3
4
5
6
7
8
9
10
r := NewReflector(
// sharedIndexInformer.listerWatcher
c.config.ListerWatcher,
// sharedIndexInformer.objectType
c.config.ObjectType,
// DeltaFIFO
c.config.Queue,
// sharedIndexInformer.resyncCheckPeriod
c.config.FullResyncPeriod,
)
1
2
3
4
5
6
7
8
9
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}

4.Reflector.ListAndWatch()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 第一次会列出所有的对象,并获取资源对象的版本号,然后watch资源对象的版本号来查看是否有被变更。首先会将版本号设置为"0",
// list()可能会导致本地的缓存相对于etcd中的内容有所延迟,Reflector会通过watch的方法将延迟的部分补上,使得本地的缓存数据与etcd的数据保持一致
// 1.获取监视列表,当ResourceVersion值为"0"时,分页获取资源列表;当ResourceVersion值不为"0"时,不分页获取资源列表
// 2.调用Reflector.watchHandler()根据ResourceVersion值的变更对资源进行对应操作
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
var resourceVersion string

options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}

if err := func() error {
go func() {
// 使用list获取资源信息
list, paginatedResult, err = pager.List(context.Background(), options)
}()
// 使用list得到的数据更新DeltaFIFO中的内容
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("unable to sync list result: %v", err)
}
}(); err != nil {
return err
}

for {
// 使用Watch()对资源进行监控
w, err := r.listerWatcher.Watch(options)

// 根据event.Type对watch的事件进行对应的处理,即对DeltaFIFO中的数据进行对应的处理
// 1.根据事件类型指定对应的操作,r.store指向DeltaFIFO结构体(kubernetes/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go)
// 2.事件类型为watch.Added("ADDED")时,调用DeltaFIFO.Add()插入项
// 3.事件类型为watch.Modified("MODIFIED")时,调用DeltaFIFO.Update()插入更新项
// 4.事件类型为watch.Deleted("DELETED")时,调用DeltaFIFO.Delete()删除更新项
// 5.事件类型为watch.Bookmark("BOOKMARK")时,不处理
// 6.如无异常,则更新resourceVersion
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
}
}
}

5.processor.run()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (p *processorListener) run() {
stopCh := make(chan struct{})
// 根据消息的类型,调用ResourceEventHandler的对应回调函数进行处理
wait.Until(func() {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
close(stopCh)
}, 1*time.Second, stopCh)
}

以deployment为例,NewDeploymentController()(kubernetes/pkg/controller/deployment/deployment_controller.go)中注册了三种类型的Informer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1.dInformer.Informer()
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addDeployment,
UpdateFunc: dc.updateDeployment,
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
DeleteFunc: dc.deleteDeployment,
})
2.rsInformer.Informer()
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addReplicaSet,
UpdateFunc: dc.updateReplicaSet,
DeleteFunc: dc.deleteReplicaSet,
})
3.podInformer.Informer()
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: dc.deletePod,
})

三、总结

以deployment为例

kube-controller-manager启动:
1.完成DeploymentController的初始化,为DeploymentController添加Informer事件处理器
2.Informer的初始化,包括对DeltaFIFO、Reflector的初始化
3.Reflector通过list()获取一次全量数据,将内容存储到DeltaFIFO中,再通过watch()保持本地缓存(DeltaFIFO)与etcd数据库中内容一致
4.DeploymentController开始运行,从DeltaFIFO队列中取出内容进行处理

1.syncDeployment()

kubernetes/pkg/controller/deployment/deployment_controller.go - DeploymentController.syncDeployment()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// syncDeployment will sync the deployment with the given key.
// This function is not meant to be invoked concurrently with the same key.
func (dc *DeploymentController) syncDeployment(key string) error {
// 根据key获取namespace和name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
// 根据namespace和name获取到队列中deployment的信息
deployment, err := dc.dLister.Deployments(namespace).Get(name)

// Deep-copy otherwise we are mutating our cache.
// TODO: Deep-copy only when needed.
d := deployment.DeepCopy()

everything := metav1.LabelSelector{}
if reflect.DeepEqual(d.Spec.Selector, &everything) {
dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
if d.Status.ObservedGeneration < d.Generation {
d.Status.ObservedGeneration = d.Generation
dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
}
return nil
}

// 如deploy和rs的selector匹配则进行关联,如不一致则释放关联
// 更新ControllerRef信息
rsList, err := dc.getReplicaSetsForDeployment(d)
// 返回rs管理的pods
podMap, err := dc.getPodMapForDeployment(d, rsList)
}