什么是Controller
之前介绍了Reflector, Store, Queue。Reflector可以ListWatcher中的数据同步到Queue中。那么Queue中的数据如何同步到Store中呢,答案就是Controller。
Controller是把Reflector中的内容放入到Queue中,然后在调用pop()时会执行定义的process()函数对item进行处理。
Controller定义在/pkg/client/cache/controller.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| type Controller struct { config Config reflector *Reflector reflectorMutex sync.RWMutex } type Config struct { Queue ListerWatcher Process ProcessFunc ObjectType runtime.Object FullResyncPeriod time.Duration RetryOnError bool }
|
可以看出,Controller中主要有一个Reflector,一个Queue,一个ListerWatcher,一个ProcessFunc。接下来我们来分析Controller如何将这些连接起来的。
Run()
Run()是Controller的启动方法。在Run()中,先会生成一个Reflector,其中,传入Reflector的有ListerWatcher和Queue;然后调用Reflector的RunUntil()启动Reflector流程;最后启动processLoop()方法。
Reflector启动后,就会把ListerWatcher中的数据同步到Queue中了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| func (c *Controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() r := NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, ) c.reflectorMutex.Lock() c.reflector = r c.reflectorMutex.Unlock() r.RunUntil(stopCh) wait.Until(c.processLoop, time.Second, stopCh) }
|
processLoop()
processLoop()函数调用Queue的Pop(),传入Pop()的参数是c.config.Process,在Pop()方法中,会调用Process()对obj进行处理。
处理失败时,会把obj重新放回到Queue中。
1 2 3 4 5 6 7 8 9 10 11 12
| func (c *Controller) processLoop() { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if c.config.RetryOnError { c.config.Queue.AddIfNotPresent(obj) } } } }
|
NewInformer()是对Controller的封装,返回一个Store和一个Controller。其中clientState为Store;fifo为Queue,使用的是DeltaFIFO;Process指向了一个匿名函数,可以对DeltaFIFO中的deltas进行处理,即存入到clientState中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| func NewInformer( lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, h ResourceEventHandler, ) (Store, *Controller) { clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc) fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, clientState) cfg := &Config{ Queue: fifo, ListerWatcher: lw, ObjectType: objType, FullResyncPeriod: resyncPeriod, RetryOnError: false, Process: func(obj interface{}) error { for _, d := range obj.(Deltas) { switch d.Type { case Sync, Added, Updated: if old, exists, err := clientState.Get(d.Object); err == nil && exists { if err := clientState.Update(d.Object); err != nil { return err } h.OnUpdate(old, d.Object) } else { if err := clientState.Add(d.Object); err != nil { return err } h.OnAdd(d.Object) } case Deleted: if err := clientState.Delete(d.Object); err != nil { return err } h.OnDelete(d.Object) } } return nil }, } return clientState, New(cfg) }
|
NewIndexerInformer()和NewInformer()类似,不同的是,NewIndexerInformer()使用Indexer来存入数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| func NewIndexerInformer( lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, h ResourceEventHandler, indexers Indexers, ) (Indexer, *Controller) { clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers) fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, clientState) cfg := &Config{ Queue: fifo, ListerWatcher: lw, ObjectType: objType, FullResyncPeriod: resyncPeriod, RetryOnError: false, Process: func(obj interface{}) error { for _, d := range obj.(Deltas) { switch d.Type { case Sync, Added, Updated: if old, exists, err := clientState.Get(d.Object); err == nil && exists { if err := clientState.Update(d.Object); err != nil { return err } h.OnUpdate(old, d.Object) } else { if err := clientState.Add(d.Object); err != nil { return err } h.OnAdd(d.Object) } case Deleted: if err := clientState.Delete(d.Object); err != nil { return err } h.OnDelete(d.Object) } } return nil }, } return clientState, New(cfg) }
|