什么是Queue
Queue在Store的基础上,支持Pop()操作。本次分析将分析两种Queue:FIFO和DeltaFIFO。
Queue定义在/pkg/client/cache/fifo.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| type Queue interface { Store Pop(PopProcessFunc) (interface{}, error) AddIfNotPresent(interface{}) error HasSynced() bool }
|
FIFO
FIFO定义在/pkg/client/cache/fifo.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| type FIFO struct { lock sync.RWMutex cond sync.Cond items map[string]interface{} queue []string populated bool initialPopulationCount int keyFunc KeyFunc }
|
其中:
- 字段items存储key和object;
- 字段queue为[]string,用来模拟queue的操作;
- 字段keyFunc标明计算object的key的函数。
Add()
把obj放入到FIFO中。如果obj有更新,则items对应key的value也更新。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| func (f *FIFO) Add(obj interface{}) error { id, err := f.keyFunc(obj) if err != nil { return KeyError{obj, err} } f.lock.Lock() defer f.lock.Unlock() f.populated = true if _, exists := f.items[id]; !exists { f.queue = append(f.queue, id) } f.items[id] = obj f.cond.Broadcast() return nil }
|
AddIfNotPresent()
AddIfNotPresent()和Add()一样,把obj加入到FIFO中。不同的是,在AddIfNotPresent()中,如果obj的key已经存在,则直接返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| func (f *FIFO) AddIfNotPresent(obj interface{}) error { id, err := f.keyFunc(obj) if err != nil { return KeyError{obj, err} } f.lock.Lock() defer f.lock.Unlock() f.addIfNotPresent(id, obj) return nil } func (f *FIFO) addIfNotPresent(id string, obj interface{}) { f.populated = true if _, exists := f.items[id]; exists { return } f.queue = append(f.queue, id) f.items[id] = obj f.cond.Broadcast() }
|
Update()
Update()直接调用Add()。
1 2 3 4
| func (f *FIFO) Update(obj interface{}) error { return f.Add(obj) }
|
Delete()
Delete()直接把obj从items中删除。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func (f *FIFO) Delete(obj interface{}) error { id, err := f.keyFunc(obj) if err != nil { return KeyError{obj, err} } f.lock.Lock() defer f.lock.Unlock() f.populated = true delete(f.items, id) return err }
|
List()
List()返回FIFO中items中数据。
1 2 3 4 5 6 7 8 9 10
| func (f *FIFO) List() []interface{} { f.lock.RLock() defer f.lock.RUnlock() list := make([]interface{}, 0, len(f.items)) for _, item := range f.items { list = append(list, item) } return list }
|
ListKeys()
ListKeys()返回FIFO中items中的key。
1 2 3 4 5 6 7 8 9 10 11
| func (f *FIFO) ListKeys() []string { f.lock.RLock() defer f.lock.RUnlock() list := make([]string, 0, len(f.items)) for key := range f.items { list = append(list, key) } return list }
|
Get() GetByKey()
Get()先计算obj的key,然后通过GetByKey()获取obj。
GetByKey()直接从items中获取值并返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { key, err := f.keyFunc(obj) if err != nil { return nil, false, KeyError{obj, err} } return f.GetByKey(key) } func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) { f.lock.RLock() defer f.lock.RUnlock() item, exists = f.items[key] return item, exists, nil }
|
Pop()
Pop()会在FIFO中取得一个有效的obj,这里有效的数据是指在items中存在(如果删除则items中不存在),然后调用参数PopProccessFunc对obj进行处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) == 0 { f.cond.Wait() } id := f.queue[0] f.queue = f.queue[1:] if f.initialPopulationCount > 0 { f.initialPopulationCount-- } item, ok := f.items[id] if !ok { continue } delete(f.items, id) err := process(item) if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err } return item, err } }
|
Replace()
Replace()会清空FIFO上的数据,并把list中的数据同步到FIFO中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| func (f *FIFO) Replace(list []interface{}, resourceVersion string) error { items := map[string]interface{}{} for _, item := range list { key, err := f.keyFunc(item) if err != nil { return KeyError{item, err} } items[key] = item } f.lock.Lock() defer f.lock.Unlock() if !f.populated { f.populated = true f.initialPopulationCount = len(items) } f.items = items f.queue = f.queue[:0] for id := range items { f.queue = append(f.queue, id) } if len(f.queue) > 0 { f.cond.Broadcast() } return nil }
|
Resync()
Resync()会把items上所有的key都放到queue中以供处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| func (f *FIFO) Resync() error { f.lock.Lock() defer f.lock.Unlock() inQueue := sets.NewString() for _, id := range f.queue { inQueue.Insert(id) } for id := range f.items { if !inQueue.Has(id) { f.queue = append(f.queue, id) } } if len(f.queue) > 0 { f.cond.Broadcast() } return nil }
|
DeltaFIFO
DeltaFIFO与FIFO不同,FIFO同一个key下只保留最新的obj,而DeltaFIFO会保留所有关于这个obj的操作。
DeltaFIFO定义在/pkg/client/cache/delta_fifo.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| type DeltaFIFO struct { lock sync.RWMutex cond sync.Cond items map[string]Deltas queue []string populated bool initialPopulationCount int keyFunc KeyFunc deltaCompressor DeltaCompressor knownObjects KeyListerGetter }
|
其中:
- items中存储的是key和Deltas,Deltas上存储了type(Updated, Added, Deleted, Sync)及Object;
- queue为[]string,模拟实现一个先进先了出队列;
- keyFunc用来计算object的key;
- knownObjects为外部真正存储的store,可以获取目前存储中的全部key。
KeyOf()
KeyOf()计算一个Delta的key。
1 2 3 4 5 6 7 8 9 10 11 12 13
| func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) { if d, ok := obj.(Deltas); ok { if len(d) == 0 { return "", KeyError{obj, ErrZeroLengthDeltasObject} } obj = d.Newest().Object } if d, ok := obj.(DeletedFinalStateUnknown); ok { return d.Key, nil } return f.keyFunc(obj) }
|
Add()
Add()调用queueActionLocked(Added, obj)把obj加入到DeltaFIFO中。
1 2 3 4 5 6 7
| func (f *DeltaFIFO) Add(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true return f.queueActionLocked(Added, obj) }
|
Update()
Update()调用queueActionLocked(Updated, obj)把obj更新到DeltaFIFO中。
1 2 3 4 5 6 7
| func (f *DeltaFIFO) Update(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true return f.queueActionLocked(Updated, obj) }
|
Delete()
Delete()先判断obj是否存在,如果存在,则调用queueActionLocked(Deleted, obj)把删除操作更新到DeltaFIFO中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| func (f *DeltaFIFO) Delete(obj interface{}) error { id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } f.lock.Lock() defer f.lock.Unlock() f.populated = true if f.knownObjects == nil { if _, exists := f.items[id]; !exists { return nil } } else { _, exists, err := f.knownObjects.GetByKey(id) _, itemsExist := f.items[id] if err == nil && !exists && !itemsExist { return nil } } return f.queueActionLocked(Deleted, obj) }
|
AddIfNotPresent()
AddIfNotPresent()在items中不存该obj对应的key时再把obj加入到DeltaFIFO中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error { deltas, ok := obj.(Deltas) if !ok { return fmt.Errorf("object must be of type deltas, but got: %#v", obj) } id, err := f.KeyOf(deltas.Newest().Object) if err != nil { return KeyError{obj, err} } f.lock.Lock() defer f.lock.Unlock() f.addIfNotPresent(id, deltas) return nil } func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) { f.populated = true if _, exists := f.items[id]; exists { return } f.queue = append(f.queue, id) f.items[id] = deltas f.cond.Broadcast() }
|
queueActionLocked()
queueActionLocked()先计算obj的key,然后把actionType和obj组成Delta,添加到items中,并进行去重操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } if actionType == Sync && f.willObjectBeDeletedLocked(id) { return nil } newDeltas := append(f.items[id], Delta{actionType, obj}) newDeltas = dedupDeltas(newDeltas) if f.deltaCompressor != nil { newDeltas = f.deltaCompressor.Compress(newDeltas) } _, exists := f.items[id] if len(newDeltas) > 0 { if !exists { f.queue = append(f.queue, id) } f.items[id] = newDeltas f.cond.Broadcast() } else if exists { delete(f.items, id) } return nil }
|
ListKeys()
ListKeys()返回items上key列表。
1 2 3 4 5 6 7 8 9 10
| func (f *DeltaFIFO) ListKeys() []string { f.lock.RLock() defer f.lock.RUnlock() list := make([]string, 0, len(f.items)) for key := range f.items { list = append(list, key) } return list }
|
Get() GetByKey()
Get()先计算obj的key,然后通过调用GetByKey获取obj;
GetByKey()依据key从items上获取obj,并返回。
Get()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { key, err := f.KeyOf(obj) if err != nil { return nil, false, KeyError{obj, err} } return f.GetByKey(key) } func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) { f.lock.RLock() defer f.lock.RUnlock() d, exists := f.items[key] if exists { d = copyDeltas(d) } return d, exists, nil }
|
Pop()
Pop()的参数为PopProcessFunc。Pop()先从queue上取出一个key(即id),然后从items上获取该id下所有的deltas,并调用PopProcessFunc()进行处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) == 0 { f.cond.Wait() } id := f.queue[0] f.queue = f.queue[1:] item, ok := f.items[id] if f.initialPopulationCount > 0 { f.initialPopulationCount-- } if !ok { continue } delete(f.items, id) err := process(item) if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err } return item, err } }
|
Replace()
Replace()的作用就是把list的内容同步到DeltaFIFO,继而同步到外部的store中。在同步list中内容的同时,也要把原来DeltaFIFO及外部Store中多出来的内容清除。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { f.lock.Lock() defer f.lock.Unlock() keys := make(sets.String, len(list)) for _, item := range list { key, err := f.KeyOf(item) if err != nil { return KeyError{item, err} } keys.Insert(key) if err := f.queueActionLocked(Sync, item); err != nil { return fmt.Errorf("couldn't enqueue object: %v", err) } } if f.knownObjects == nil { for k, oldItem := range f.items { if keys.Has(k) { continue } var deletedObj interface{} if n := oldItem.Newest(); n != nil { deletedObj = n.Object } if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { return err } } if !f.populated { f.populated = true f.initialPopulationCount = len(list) } return nil } knownKeys := f.knownObjects.ListKeys() queuedDeletions := 0 for _, k := range knownKeys { if keys.Has(k) { continue } deletedObj, exists, err := f.knownObjects.GetByKey(k) if err != nil { deletedObj = nil glog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) } else if !exists { deletedObj = nil glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) } queuedDeletions++ if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { return err } } if !f.populated { f.populated = true f.initialPopulationCount = len(list) + queuedDeletions } return nil }
|
Resync()
Resync()实现Resync()操作,更新外部store上所有的item,如果该item已经在queue中了,则忽略该Resync()忽略该queue的更新。此处有个疑问,出错情况下怎么恢复???
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| func (f *DeltaFIFO) Resync() error { var keys []string func() { f.lock.RLock() defer f.lock.RUnlock() keys = f.knownObjects.ListKeys() }() for _, k := range keys { if err := f.syncKey(k); err != nil { return err } } return nil } func (f *DeltaFIFO) syncKey(key string) error { f.lock.Lock() defer f.lock.Unlock() obj, exists, err := f.knownObjects.GetByKey(key) if err != nil { glog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key) return nil } else if !exists { glog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key) return nil } id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } if len(f.items[id]) > 0 { return nil } if err := f.queueActionLocked(Sync, obj); err != nil { return fmt.Errorf("couldn't queue object: %v", err) } return nil }
|
总结
Queue是Reflector和Store中间的一个缓存。Reflector把ListWatcher中的内容同步到Queue中,然后controller机制会把Queue中的内容同步到Store中。