上次分析了controller及newInformer()和newIndexerInformer()。本次将分析SharedInformer。
SharedInformer也是通过reflector机制把ListWatcher中的数据同步到Queue中,然后通过用户自定义的处理函数对Queue中的数据进行处理,这处理函数可以有很多个,这也是为什么称为SharedInformer。
所以SharedInformer是多个处理函数共享一个reflector数据流的一种结构体。
SharedInformer定义在/pkg/client/cache/shared_informer.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { sharedIndexInformer := &sharedIndexInformer{ processor: &sharedProcessor{}, indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), listerWatcher: lw, objectType: objType, fullResyncPeriod: resyncPeriod, cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)), } return sharedIndexInformer } type sharedIndexInformer struct { indexer Indexer controller *Controller processor *sharedProcessor cacheMutationDetector CacheMutationDetector listerWatcher ListerWatcher objectType runtime.Object fullResyncPeriod time.Duration started bool startedLock sync.Mutex blockDeltas sync.Mutex stopCh <-chan struct{} }
|
sharedIndexInformer结构体字段中:
- indexer: sharedIndexInformer中数据默认会在indexer中存储一份;
- controller: 即上次分析中的Controller,通过reflector机制把ListWatcher的数据放入到Queue中;
- processor: 为sharedProcess,可以把数据进行分发;
- listerWatcher: 即reflector机制中的数据源。
AddEventHandler()
AddEventHandler()向sharedIndexInformer添加数据的处理函数。处理函数会封装成listener,然后添加到processor中。关于processor和listener稍后分析。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) error { s.startedLock.Lock() defer s.startedLock.Unlock() if !s.started { listener := newProcessListener(handler) s.processor.listeners = append(s.processor.listeners, listener) return nil } s.blockDeltas.Lock() defer s.blockDeltas.Unlock() listener := newProcessListener(handler) s.processor.listeners = append(s.processor.listeners, listener) go listener.run(s.stopCh) go listener.pop(s.stopCh) items := s.indexer.List() for i := range items { listener.add(addNotification{newObj: items[i]}) } return nil }
|
Run()
Run()方法先生成controller,其中controller的Process函数为HandleDeltas(),然后启动processor和controller。controller的启动流程在上次分析中已经分析过,processor的启动流程稍后分析。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, s.indexer) cfg := &Config{ Queue: fifo, ListerWatcher: s.listerWatcher, ObjectType: s.objectType, FullResyncPeriod: s.fullResyncPeriod, RetryOnError: false, Process: s.HandleDeltas, } func() { s.startedLock.Lock() defer s.startedLock.Unlock() s.controller = New(cfg) s.started = true }() s.stopCh = stopCh s.cacheMutationDetector.Run(stopCh) s.processor.run(stopCh) s.controller.Run(stopCh) }
|
HandleDeltas()
HandleDeltas()是处理数据的方法。由于controller的Queue为Delta_FIFO,所以,HandleDeltas()处理的是deltas。
处理按delta的类型不同进行不同的处理,总的来说,分为两步:
- 把内容存入到indexer中;
- 交由processor进行分发。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() for _, d := range obj.(Deltas) { switch d.Type { case Sync, Added, Updated: s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err } s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}) } else { if err := s.indexer.Add(d.Object); err != nil { return err } s.processor.distribute(addNotification{newObj: d.Object}) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } s.processor.distribute(deleteNotification{oldObj: d.Object}) } } return nil }
|
sharedProcessor
sharedProcessor可以把obj分发到多个processorListener。
1 2 3
| type sharedProcessor struct { listeners []*processorListener }
|
distribute()
sharedProcessor的distribute()会把obj分发到其包含的所有listener中。分发就是调用listener.add()。
1 2 3 4 5
| func (p *sharedProcessor) distribute(obj interface{}) { for _, listener := range p.listeners { listener.add(obj) } }
|
run()
sharedProcessor的run()可以启动其包含的listener的run()和pop()流程。如果是后来新添加的listener,则需要手动启动run()和pop()。
1 2 3 4 5 6
| func (p *sharedProcessor) run(stopCh <-chan struct{}) { for _, listener := range p.listeners { go listener.run(stopCh) go listener.pop(stopCh) } }
|
processorListener
processorListener封装了obj的具体处理函数。可以把processorLIstener看成是一个缓冲池,最后的obj交由用户定义的handler处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| func newProcessListener(handler ResourceEventHandler) *processorListener { ret := &processorListener{ pendingNotifications: []interface{}{}, nextCh: make(chan interface{}), handler: handler, } ret.cond.L = &ret.lock return ret } type processorListener struct { lock sync.RWMutex cond sync.Cond pendingNotifications []interface{} nextCh chan interface{} handler ResourceEventHandler }
|
processorListener主要有以下字段:
- pendingNotifications: 为缓冲池。可以通过add()方法把内容加入到缓冲池中;
- nextCh: 待处理的obj;
- handler: 用户定义的处理函数。
add()
把notification,即obj放入到缓冲池,即pendingNotifications中
1 2 3 4 5 6 7
| func (p *processorListener) add(notification interface{}) { p.lock.Lock() defer p.lock.Unlock() p.pendingNotifications = append(p.pendingNotifications, notification) p.cond.Broadcast() }
|
pop()
pop()会从notification中获取一个obj,然后放入到nextCh供handler处理。pop()是使用goroutine运行的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| func (p *processorListener) pop(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() for { blockingGet := func() (interface{}, bool) { p.lock.Lock() defer p.lock.Unlock() for len(p.pendingNotifications) == 0 { select { case <-stopCh: return nil, true default: } p.cond.Wait() } nt := p.pendingNotifications[0] p.pendingNotifications = p.pendingNotifications[1:] return nt, false } notification, stopped := blockingGet() if stopped { return } select { case <-stopCh: return case p.nextCh <- notification: } } }
|
run()
run()可以消费nextCh中的obj,然后交给handler进行处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| func (p *processorListener) run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() for { var next interface{} select { case <-stopCh: func() { p.lock.Lock() defer p.lock.Unlock() p.cond.Broadcast() }() return case next = <-p.nextCh: } switch notification := next.(type) { case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj) case addNotification: p.handler.OnAdd(notification.newObj) case deleteNotification: p.handler.OnDelete(notification.oldObj) default: utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next)) } } }
|
总结
SharedInformer在controller中扮演着非常重要的角色,podInformer,nodeInformer等都由SharedInformer生成,这些将在controller-manager中分析。