什么是SharedInformer

上次分析了controller及newInformer()和newIndexerInformer()。本次将分析SharedInformer。

SharedInformer也是通过reflector机制把ListWatcher中的数据同步到Queue中,然后通过用户自定义的处理函数对Queue中的数据进行处理,这处理函数可以有很多个,这也是为什么称为SharedInformer。

所以SharedInformer是多个处理函数共享一个reflector数据流的一种结构体。

SharedInformer定义在/pkg/client/cache/shared_informer.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: objType,
fullResyncPeriod: resyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
}
return sharedIndexInformer
}
type sharedIndexInformer struct {
indexer Indexer
controller *Controller
processor *sharedProcessor
cacheMutationDetector CacheMutationDetector
// This block is tracked to handle late initialization of the controller
listerWatcher ListerWatcher
objectType runtime.Object
fullResyncPeriod time.Duration
started bool
startedLock sync.Mutex
// blockDeltas gives a way to stop all event distribution so that a late event handler
// can safely join the shared informer.
blockDeltas sync.Mutex
// stopCh is the channel used to stop the main Run process. We have to track it so that
// late joiners can have a proper stop
stopCh <-chan struct{}
}

sharedIndexInformer结构体字段中:

  • indexer: sharedIndexInformer中数据默认会在indexer中存储一份;
  • controller: 即上次分析中的Controller,通过reflector机制把ListWatcher的数据放入到Queue中;
  • processor: 为sharedProcess,可以把数据进行分发;
  • listerWatcher: 即reflector机制中的数据源。

AddEventHandler()

AddEventHandler()向sharedIndexInformer添加数据的处理函数。处理函数会封装成listener,然后添加到processor中。关于processor和listener稍后分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//***添加handler对items进行处理,其中handler会封装成listener***//
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) error {
s.startedLock.Lock()
defer s.startedLock.Unlock()
if !s.started {
listener := newProcessListener(handler)
s.processor.listeners = append(s.processor.listeners, listener)
return nil
}
// in order to safely join, we have to
// 1. stop sending add/update/delete notifications
// 2. do a list against the store
// 3. send synthetic "Add" events to the new handler
// 4. unblock
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
listener := newProcessListener(handler)
//***把listener加入processor中***//
s.processor.listeners = append(s.processor.listeners, listener)
go listener.run(s.stopCh)
go listener.pop(s.stopCh)
items := s.indexer.List()
for i := range items {
listener.add(addNotification{newObj: items[i]})
}
return nil
}

Run()

Run()方法先生成controller,其中controller的Process函数为HandleDeltas(),然后启动processor和controller。controller的启动流程在上次分析中已经分析过,processor的启动流程稍后分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
//***创建DeltaFIFO***//
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, s.indexer)
//***controller的config***//
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.fullResyncPeriod,
RetryOnError: false,
//***处理函数为HandleDeltas()***//
Process: s.HandleDeltas,
}
//***闭包***//
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
//***生成controller***//
s.controller = New(cfg)
s.started = true
}()
s.stopCh = stopCh
s.cacheMutationDetector.Run(stopCh)
s.processor.run(stopCh)
s.controller.Run(stopCh)
}

HandleDeltas()

HandleDeltas()是处理数据的方法。由于controller的Queue为Delta_FIFO,所以,HandleDeltas()处理的是deltas。

处理按delta的类型不同进行不同的处理,总的来说,分为两步:

  1. 把内容存入到indexer中;
  2. 交由processor进行分发。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//***可以处理delta对象,processor会把delta分发到processorlistener,processorlistener中有obj的处理函数***//
//***HandleDeltas作为Process函数传给controller***//
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object})
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object})
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object})
}
}
return nil
}

sharedProcessor

sharedProcessor可以把obj分发到多个processorListener。

1
2
3
type sharedProcessor struct {
listeners []*processorListener
}

distribute()

sharedProcessor的distribute()会把obj分发到其包含的所有listener中。分发就是调用listener.add()。

1
2
3
4
5
func (p *sharedProcessor) distribute(obj interface{}) {
for _, listener := range p.listeners {
listener.add(obj)
}
}

run()

sharedProcessor的run()可以启动其包含的listener的run()和pop()流程。如果是后来新添加的listener,则需要手动启动run()和pop()。

1
2
3
4
5
6
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
for _, listener := range p.listeners {
go listener.run(stopCh)
go listener.pop(stopCh)
}
}

processorListener

processorListener封装了obj的具体处理函数。可以把processorLIstener看成是一个缓冲池,最后的obj交由用户定义的handler处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func newProcessListener(handler ResourceEventHandler) *processorListener {
ret := &processorListener{
pendingNotifications: []interface{}{},
nextCh: make(chan interface{}),
handler: handler,
}
ret.cond.L = &ret.lock
return ret
}
//***processorListener是一个缓冲池,最后的资源由handler处理***//
type processorListener struct {
// lock/cond protects access to 'pendingNotifications'.
lock sync.RWMutex
cond sync.Cond
// pendingNotifications is an unbounded slice that holds all notifications not yet distributed
// there is one per listener, but a failing/stalled listener will have infinite pendingNotifications
// added until we OOM.
// TODO This is no worse that before, since reflectors were backed by unbounded DeltaFIFOs, but
// we should try to do something better
pendingNotifications []interface{}
nextCh chan interface{}
handler ResourceEventHandler
}

processorListener主要有以下字段:

  • pendingNotifications: 为缓冲池。可以通过add()方法把内容加入到缓冲池中;
  • nextCh: 待处理的obj;
  • handler: 用户定义的处理函数。

add()

把notification,即obj放入到缓冲池,即pendingNotifications中

1
2
3
4
5
6
7
func (p *processorListener) add(notification interface{}) {
p.lock.Lock()
defer p.lock.Unlock()
p.pendingNotifications = append(p.pendingNotifications, notification)
p.cond.Broadcast()
}

pop()

pop()会从notification中获取一个obj,然后放入到nextCh供handler处理。pop()是使用goroutine运行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//***pop()操作会从pendingNotfications中获取一个notification***//
//***然后把该notification放到nextCh中供run()函数消费***//
func (p *processorListener) pop(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
for {
blockingGet := func() (interface{}, bool) {
p.lock.Lock()
defer p.lock.Unlock()
for len(p.pendingNotifications) == 0 {
// check if we're shutdown
select {
case <-stopCh:
return nil, true
default:
}
p.cond.Wait()
}
nt := p.pendingNotifications[0]
p.pendingNotifications = p.pendingNotifications[1:]
return nt, false
}
notification, stopped := blockingGet()
if stopped {
return
}
select {
case <-stopCh:
return
//***把notification传递给nextCh***//
case p.nextCh <- notification:
}
}
}

run()

run()可以消费nextCh中的obj,然后交给handler进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//***run()函数消费nextCh中的notification,然后对notification进行处理***//
func (p *processorListener) run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
for {
var next interface{}
select {
case <-stopCh:
func() {
p.lock.Lock()
defer p.lock.Unlock()
p.cond.Broadcast()
}()
return
case next = <-p.nextCh:
}
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
}
}
}

总结

SharedInformer在controller中扮演着非常重要的角色,podInformer,nodeInformer等都由SharedInformer生成,这些将在controller-manager中分析。