什么是Controller

之前介绍了Reflector, Store, Queue。Reflector可以ListWatcher中的数据同步到Queue中。那么Queue中的数据如何同步到Store中呢,答案就是Controller。

Controller是把Reflector中的内容放入到Queue中,然后在调用pop()时会执行定义的process()函数对item进行处理。

Controller定义在/pkg/client/cache/controller.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
type Controller struct {
config Config
reflector *Reflector
reflectorMutex sync.RWMutex
}
// Config contains all the settings for a Controller.
type Config struct {
// The queue for your objects; either a FIFO or
// a DeltaFIFO. Your Process() function should accept
// the output of this Queue's Pop() method.
Queue
// Something that can list and watch your objects.
ListerWatcher
// Something that can process your objects.
Process ProcessFunc
// The type of your objects.
ObjectType runtime.Object
// Reprocess everything at least this often.
// Note that if it takes longer for you to clear the queue than this
// period, you will end up processing items in the order determined
// by FIFO.Replace(). Currently, this is random. If this is a
// problem, we can change that replacement policy to append new
// things to the end of the queue instead of replacing the entire
// queue.
FullResyncPeriod time.Duration
// If true, when Process() returns an error, re-enqueue the object.
// TODO: add interface to let you inject a delay/backoff or drop
// the object completely if desired. Pass the object in
// question to this interface as a parameter.
RetryOnError bool
}

可以看出,Controller中主要有一个Reflector,一个Queue,一个ListerWatcher,一个ProcessFunc。接下来我们来分析Controller如何将这些连接起来的。

Run()

Run()是Controller的启动方法。在Run()中,先会生成一个Reflector,其中,传入Reflector的有ListerWatcher和Queue;然后调用Reflector的RunUntil()启动Reflector流程;最后启动processLoop()方法。

Reflector启动后,就会把ListerWatcher中的数据同步到Queue中了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (c *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
//***生成新在reflector,把ListerWatcher中的内容同步到Queue中***//
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
r.RunUntil(stopCh)
//***开始运行processLoop()***//
wait.Until(c.processLoop, time.Second, stopCh)
}

processLoop()

processLoop()函数调用Queue的Pop(),传入Pop()的参数是c.config.Process,在Pop()方法中,会调用Process()对obj进行处理。

处理失败时,会把obj重新放回到Queue中。

1
2
3
4
5
6
7
8
9
10
11
12
func (c *Controller) processLoop() {
for {
//***从Queue中获取一个item,并用c.config.Process进行处理***//
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}

NewInformer()

NewInformer()是对Controller的封装,返回一个Store和一个Controller。其中clientState为Store;fifo为Queue,使用的是DeltaFIFO;Process指向了一个匿名函数,可以对DeltaFIFO中的deltas进行处理,即存入到clientState中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func NewInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
) (Store, *Controller) {
// This will hold the client state, as we know it.
//***生成clientState,clientState是一个store***//
clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
// This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
//***生成DeltaFIFO,reflector中的event会同步到fifo中,然后由Process把fifo中的delta同步到clientState***//
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, clientState)
cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
//***processLoop()函数中调用Pop()时会传入Process函数***//
Process: func(obj interface{}) error {
// from oldest to newest
//***处理obj下的所有delta***//
for _, d := range obj.(Deltas) {
switch d.Type {
//***Fankang***//
//***处理Sync, Added, Updated***//
case Sync, Added, Updated:
//***如果clientState中存在该obj,则使用Update()或OnUpdate同步***//
//***否则,使用Add()或OnAdd()同步***//
if old, exists, err := clientState.Get(d.Object); err == nil && exists {
if err := clientState.Update(d.Object); err != nil {
return err
}
h.OnUpdate(old, d.Object)
} else {
if err := clientState.Add(d.Object); err != nil {
return err
}
h.OnAdd(d.Object)
}
//***处理Deleted***//
case Deleted:
if err := clientState.Delete(d.Object); err != nil {
return err
}
h.OnDelete(d.Object)
}
}
return nil
},
}
//***返回clientState, controller***//
return clientState, New(cfg)
}

NewIndexerInformer()

NewIndexerInformer()和NewInformer()类似,不同的是,NewIndexerInformer()使用Indexer来存入数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func NewIndexerInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
indexers Indexers,
) (Indexer, *Controller) {
// This will hold the client state, as we know it.
clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
// This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, clientState)
cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
Process: func(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
if old, exists, err := clientState.Get(d.Object); err == nil && exists {
if err := clientState.Update(d.Object); err != nil {
return err
}
h.OnUpdate(old, d.Object)
} else {
if err := clientState.Add(d.Object); err != nil {
return err
}
h.OnAdd(d.Object)
}
case Deleted:
if err := clientState.Delete(d.Object); err != nil {
return err
}
h.OnDelete(d.Object)
}
}
return nil
},
}
return clientState, New(cfg)
}