garbagecollector的功能主要有以下三点:
执行联级删除时(资源删除由apiserver完成),回收子资源;
执行非联级删除时,删除子资源的联级关系,再回收资源;
维护有资源之间的联级关系表。
为了更好地说明garbagecollector的机制,将以replicationController和Pods为例,在Kubernetes v1.5.2中,可以使用curl -XDELETE http://kubernetes:8080/api/v1/namespaces/default/replicationcontrollers/ubuntu?orphanDependents=false
执行联级删除,即删除replicationController时,后台会自动回收Pods。 具体见”finalizer机制”的分析。
garbagecollector 先来看下garbagecollector的定义,在/pkg/controller/garbagecollector/garbagecollector.go中:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type GarbageCollector struct {
restMapper meta.RESTMapper
metaOnlyClientPool dynamic.ClientPool
clientPool dynamic.ClientPool
dirtyQueue *workqueue.TimedWorkQueue
orphanQueue *workqueue.TimedWorkQueue
monitors []monitor
propagator *Propagator
clock clock.Clock
registeredRateLimiter *RegisteredRateLimiter
registeredRateLimiterForMonitors *RegisteredRateLimiter
absentOwnerCache *UIDCache
}
其中,主要字段的含义为:
dirtyQueue: 如果某object的owner不存在,则把该object加入到dirtyQueue进行检查,如果该object所有的owner都不存在,则把object删除;
orphanQueue: 如果某object被标记为”orphan”,则把object的子资源的联级关系删除,然后再把object删除;
monitors: 对某种资源变化的监控;
propagator: 联级关系维护者。
NewGarbageCollector() NewGarbageCollector()可以生成一个新的GarbageCollector。在GarbageCollector中,会对某类资源生成一个monitor对其变化进行监控。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func NewGarbageCollector (metaOnlyClientPool dynamic.ClientPool, clientPool dynamic.ClientPool, mapper meta.RESTMapper, resources []unversioned.GroupVersionResource) (*GarbageCollector, error) {
gc := &GarbageCollector{
metaOnlyClientPool: metaOnlyClientPool,
clientPool: clientPool,
restMapper: mapper,
clock: clock.RealClock{},
dirtyQueue: workqueue.NewTimedWorkQueue(),
orphanQueue: workqueue.NewTimedWorkQueue(),
registeredRateLimiter: NewRegisteredRateLimiter(resources),
registeredRateLimiterForMonitors: NewRegisteredRateLimiter(resources),
absentOwnerCache: NewUIDCache(500 ),
}
gc.propagator = &Propagator{
eventQueue: workqueue.NewTimedWorkQueue(),
uidToNode: &concurrentUIDToNode{
RWMutex: &sync.RWMutex{},
uidToNode: make (map [types.UID]*node),
},
gc: gc,
}
for _, resource := range resources {
if _, ok := ignoredResources[resource]; ok {
glog.V(6 ).Infof("ignore resource %#v" , resource)
continue
}
kind, err := gc.restMapper.KindFor(resource)
if err != nil {
if _, ok := err.(*meta.NoResourceMatchError); ok {
glog.Warningf("ignore NoResourceMatchError for %v" , resource)
continue
}
return nil , err
}
monitor, err := gc.monitorFor(resource, kind)
if err != nil {
return nil , err
}
gc.monitors = append (gc.monitors, monitor)
}
return gc, nil
}
Run() Run()可以启动GarbageCollector。主要流程是:
启动monitors中的各monitor;
启动propagator.processEvent();
启动i个worker及i个orphanFinalizer;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) {
glog.Infof("Garbage Collector: Initializing")
//***启动各种资源的monitor,以监听资源事件***//
//***把事件放入eventQueue***//
for _, monitor := range gc.monitors {
go monitor.controller.Run(stopCh)
}
wait.PollInfinite(10*time.Second, func() (bool, error) {
for _, monitor := range gc.monitors {
if !monitor.controller.HasSynced() {
glog.Infof("Garbage Collector: Waiting for resource monitors to be synced...")
return false, nil
}
}
return true, nil
})
glog.Infof("Garbage Collector: All monitored resources synced. Proceeding to collect garbage")
// worker
//***启动processEvent***//
//***从eventQueue中获取item,并存入orphanQueue中***//
go wait.Until(gc.propagator.processEvent, 0, stopCh)
//***启动workers个worker***//
for i := 0; i < workers; i++ {
//***检查owner存不存在,如果不存在,则回该资源***//
go wait.Until(gc.worker, 0, stopCh)
//***从orphanQueue中获取item,并执行联级删除***//
go wait.Until(gc.orphanFinalizer, 0, stopCh)
}
Register()
<-stopCh
glog.Infof("Garbage Collector: Shutting down")
gc.dirtyQueue.ShutDown()
gc.orphanQueue.ShutDown()
gc.propagator.eventQueue.ShutDown()
}
monitorFor() monitorFor()可以生成一个monitor: monitor的定义如下:1
2
3
4
type monitor struct {
store cache.Store
controller *cache.Controller
}
monitorFor()方法如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func (gc *GarbageCollector) monitorFor (resource unversioned.GroupVersionResource, kind unversioned.GroupVersionKind) (monitor, error) {
glog.V(6 ).Infof("create storage for resource %s" , resource)
var monitor monitor
client, err := gc.metaOnlyClientPool.ClientForGroupVersionKind(kind)
if err != nil {
return monitor, err
}
gc.registeredRateLimiterForMonitors.registerIfNotPresent(resource.GroupVersion(), client, "garbage_collector_monitoring" )
setObjectTypeMeta := func (obj interface {}) {
runtimeObject, ok := obj.(runtime.Object)
if !ok {
utilruntime.HandleError(fmt.Errorf("expected runtime.Object, got %#v" , obj))
}
runtimeObject.GetObjectKind().SetGroupVersionKind(kind)
}
monitor.store, monitor.controller = cache.NewInformer(
gcListWatcher(client, resource),
nil ,
ResourceResyncTime,
cache.ResourceEventHandlerFuncs{
AddFunc: func (obj interface {}) {
setObjectTypeMeta(obj)
event := &event{
eventType: addEvent,
obj: obj,
}
gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event})
},
UpdateFunc: func (oldObj, newObj interface {}) {
setObjectTypeMeta(newObj)
event := &event{updateEvent, newObj, oldObj}
gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event})
},
DeleteFunc: func (obj interface {}) {
if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = deletedFinalStateUnknown.Obj
}
setObjectTypeMeta(obj)
event := &event{
eventType: deleteEvent,
obj: obj,
}
gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event})
},
},
)
return monitor, nil
}
可以看到,monitor中的store和controller就是对应informer返回的store和controller。controller的handler就是把事件加入到gc.propagator.eventQueue。现在,我们找到了eventQueue的生产者,即monitor。
worker() worker()的作用就是消费dirtyQueue中的object,然后调用processItem()对object进行处理。 processItem()会检查object的owner,如果所有owner都不存在,则把object删除,以完成联级删除。 现在我们有了dirtyQueue的消费者,即worker。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
func (gc *GarbageCollector) worker () {
timedItem, quit := gc.dirtyQueue.Get()
if quit {
return
}
defer gc.dirtyQueue.Done(timedItem)
err := gc.processItem(timedItem.Object.(*node))
if err != nil {
utilruntime.HandleError(fmt.Errorf("Error syncing item %#v: %v" , timedItem.Object, err))
gc.dirtyQueue.Add(timedItem)
return
}
DirtyProcessingLatency.Observe(sinceInMicroseconds(gc.clock, timedItem.StartTime))
}
func (gc *GarbageCollector) processItem (item *node) error {
latest, err := gc.getObject(item.identity)
if err != nil {
if errors.IsNotFound(err) {
glog.V(6 ).Infof("item %v not found, generating a virtual delete event" , item.identity)
event := &event{
eventType: deleteEvent,
obj: objectReferenceToMetadataOnlyObject(item.identity),
}
glog.V(6 ).Infof("generating virtual delete event for %s\n\n" , event.obj)
gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event})
return nil
}
return err
}
if latest.GetUID() != item.identity.UID {
glog.V(6 ).Infof("UID doesn't match, item %v not found, generating a virtual delete event" , item.identity)
event := &event{
eventType: deleteEvent,
obj: objectReferenceToMetadataOnlyObject(item.identity),
}
glog.V(6 ).Infof("generating virtual delete event for %s\n\n" , event.obj)
gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event})
return nil
}
ownerReferences := latest.GetOwnerReferences()
if len (ownerReferences) == 0 {
glog.V(6 ).Infof("object %s's doesn't have an owner, continue on next item" , item.identity)
return nil
}
for _, reference := range ownerReferences {
if gc.absentOwnerCache.Has(reference.UID) {
glog.V(6 ).Infof("according to the absentOwnerCache, object %s's owner %s/%s, %s does not exist" , item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
continue
}
fqKind := unversioned.FromAPIVersionAndKind(reference.APIVersion, reference.Kind)
client, err := gc.clientPool.ClientForGroupVersionKind(fqKind)
if err != nil {
return err
}
resource, err := gc.apiResource(reference.APIVersion, reference.Kind, len (item.identity.Namespace) != 0 )
if err != nil {
return err
}
owner, err := client.Resource(resource, item.identity.Namespace).Get(reference.Name)
if err == nil {
if owner.GetUID() != reference.UID {
glog.V(6 ).Infof("object %s's owner %s/%s, %s is not found, UID mismatch" , item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
gc.absentOwnerCache.Add(reference.UID)
continue
}
glog.V(6 ).Infof("object %s has at least an existing owner, will not garbage collect" , item.identity.UID)
return nil
} else if errors.IsNotFound(err) {
gc.absentOwnerCache.Add(reference.UID)
glog.V(6 ).Infof("object %s's owner %s/%s, %s is not found" , item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
} else {
return err
}
}
glog.V(2 ).Infof("none of object %s's owners exist any more, will garbage collect it" , item.identity)
return gc.deleteObject(item.identity)
}
orphanFinalizer() orphanFinalizer()会消费orphanQueue中的object,然后调用ophanDependents()删除对应dependents的联级关系,最后调用removeOrphanFinalizer()移除该object的”orphan” finalizer,并使用Update()删除该object。关于Update()删除功能,可以看/registry/generic包中的相关代码。 orphanDependents()会把dependent中的ownerReference中对应的owner清除。orphanDependents()是如何用patch实现删除list中的一个值的,还有待详细研究。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func (gc *GarbageCollector) orphanFinalizer () {
timedItem, quit := gc.orphanQueue.Get()
if quit {
return
}
defer gc.orphanQueue.Done(timedItem)
owner, ok := timedItem.Object.(*node)
if !ok {
utilruntime.HandleError(fmt.Errorf("expect *node, got %#v" , timedItem.Object))
}
owner.dependentsLock.RLock()
dependents := make ([]*node, 0 , len (owner.dependents))
for dependent := range owner.dependents {
dependents = append (dependents, dependent)
}
owner.dependentsLock.RUnlock()
err := gc.orhpanDependents(owner.identity, dependents)
if err != nil {
glog.V(6 ).Infof("orphanDependents for %s failed with %v" , owner.identity, err)
gc.orphanQueue.Add(timedItem)
return
}
err = gc.removeOrphanFinalizer(owner)
if err != nil {
glog.V(6 ).Infof("removeOrphanFinalizer for %s failed with %v" , owner.identity, err)
gc.orphanQueue.Add(timedItem)
}
OrphanProcessingLatency.Observe(sinceInMicroseconds(gc.clock, timedItem.StartTime))
}
func (gc *GarbageCollector) orhpanDependents (owner objectReference, dependents []*node) error {
var failedDependents []objectReference
var errorsSlice []error
for _, dependent := range dependents {
deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}` , owner.UID, dependent.identity.UID)
_, err := gc.patchObject(dependent.identity, []byte (deleteOwnerRefPatch))
if err != nil && !errors.IsNotFound(err) {
errorsSlice = append (errorsSlice, fmt.Errorf("orphaning %s failed with %v" , dependent.identity, err))
}
}
if len (failedDependents) != 0 {
return fmt.Errorf("failed to orphan dependents of owner %s, got errors: %s" , owner, utilerrors.NewAggregate(errorsSlice).Error())
}
glog.V(6 ).Infof("successfully updated all dependents" )
return nil
}
removeOrphanFinalizer() removeOrphanFinalizer()可以删除”orphan” finalizer,然后更新该object,注意更新操作在某些条件是删除。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (gc *GarbageCollector) removeOrphanFinalizer (owner *node) error {
const retries = 5
for count := 0 ; count < retries; count++ {
ownerObject, err := gc.getObject(owner.identity)
if err != nil {
return fmt.Errorf("cannot finalize owner %s, because cannot get it. The garbage collector will retry later." , owner.identity)
}
accessor, err := meta.Accessor(ownerObject)
if err != nil {
return fmt.Errorf("cannot access the owner object: %v. The garbage collector will retry later." , err)
}
finalizers := accessor.GetFinalizers()
var newFinalizers []string
found := false
for _, f := range finalizers {
if f == api.FinalizerOrphan {
found = true
break
} else {
newFinalizers = append (newFinalizers, f)
}
}
if !found {
glog.V(6 ).Infof("the orphan finalizer is already removed from object %s" , owner.identity)
return nil
}
ownerObject.SetFinalizers(newFinalizers)
_, err = gc.updateObject(owner.identity, ownerObject)
if err == nil {
return nil
}
if err != nil && !errors.IsConflict(err) {
return fmt.Errorf("cannot update the finalizers of owner %s, with error: %v, tried %d times" , owner.identity, err, count+1 )
}
glog.V(6 ).Infof("got conflict updating the owner object %s, tried %d times" , owner.identity, count+1 )
}
return fmt.Errorf("updateMaxRetries(%d) has reached. The garbage collector will retry later for owner %v." , retries, owner.identity)
}
接下来,我们来看下联级关系维护者-Propagator。
Propagator Propagator维护结构体值间的联级关系,定义在/pkg/controller/garbagecollector/garbagecollector.go中:1
2
3
4
5
6
7
type Propagator struct {
eventQueue *workqueue.TimedWorkQueue
uidToNode *concurrentUIDToNode
gc *GarbageCollector
}
结构体间的联级关系是通过node的概念来维护的,node定义如下:1
2
3
4
5
6
7
8
9
10
11
12
13
type node struct {
identity objectReference
dependentsLock sync.RWMutex
dependents map [*node]struct {}
owners []metatypes.OwnerReference
}
可以看出,node的dependents字段表示该node的dependents;owners字段表示该node的owners。
insertNode() insertNode()可以更新node及其联级关系。 insertNode()会调用addDependentToOwners()。addDependentToOwners()会检查该node的owner是否存在,如有不存在的情况,则把该node加入到dirtyQueue中供worker消费。所以dirtyQueue的生产者是insertNode(),消费者是worker()。insertNode()在1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (p *Propagator) insertNode (n *node) {
p.uidToNode.Write(n)
p.addDependentToOwners(n, n.owners)
}
func (p *Propagator) addDependentToOwners (n *node, owners []metatypes.OwnerReference) {
for _, owner := range owners {
ownerNode, ok := p.uidToNode.Read(owner.UID)
if !ok {
ownerNode = &node{
identity: objectReference{
OwnerReference: owner,
Namespace: n.identity.Namespace,
},
dependents: make (map [*node]struct {}),
}
glog.V(6 ).Infof("add virtual node.identity: %s\n\n" , ownerNode.identity)
p.uidToNode.Write(ownerNode)
p.gc.dirtyQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: ownerNode})
}
ownerNode.addDependent(n)
}
}
processEvent() processEvent()从eventQueue中取得item并进行处理。处理的过程主要是维护联级关系及在适当的时候调用了insertNode和把object加入到orphanQueue中。所以,processEvent()是orphanQueue的生产者。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
func (p *Propagator) processEvent () {
timedItem, quit := p.eventQueue.Get()
if quit {
return
}
defer p.eventQueue.Done(timedItem)
event, ok := timedItem.Object.(*event)
if !ok {
utilruntime.HandleError(fmt.Errorf("expect a *event, got %v" , timedItem.Object))
return
}
obj := event.obj
accessor, err := meta.Accessor(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("cannot access obj: %v" , err))
return
}
typeAccessor, err := meta.TypeAccessor(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("cannot access obj: %v" , err))
return
}
glog.V(6 ).Infof("Propagator process object: %s/%s, namespace %s, name %s, event type %s" , typeAccessor.GetAPIVersion(), typeAccessor.GetKind(), accessor.GetNamespace(), accessor.GetName(), event.eventType)
existingNode, found := p.uidToNode.Read(accessor.GetUID())
switch {
case (event.eventType == addEvent || event.eventType == updateEvent) && !found:
newNode := &node{
identity: objectReference{
OwnerReference: metatypes.OwnerReference{
APIVersion: typeAccessor.GetAPIVersion(),
Kind: typeAccessor.GetKind(),
UID: accessor.GetUID(),
Name: accessor.GetName(),
},
Namespace: accessor.GetNamespace(),
},
dependents: make (map [*node]struct {}),
owners: accessor.GetOwnerReferences(),
}
p.insertNode(newNode)
if shouldOrphanDependents(event, accessor) {
glog.V(6 ).Infof("add %s to the orphanQueue" , newNode.identity)
p.gc.orphanQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: newNode})
}
case (event.eventType == addEvent || event.eventType == updateEvent) && found:
if shouldOrphanDependents(event, accessor) {
glog.V(6 ).Infof("add %s to the orphanQueue" , existingNode.identity)
p.gc.orphanQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: existingNode})
}
added, removed := referencesDiffs(existingNode.owners, accessor.GetOwnerReferences())
if len (added) == 0 && len (removed) == 0 {
glog.V(6 ).Infof("The updateEvent %#v doesn't change node references, ignore" , event)
return
}
existingNode.owners = accessor.GetOwnerReferences()
p.addDependentToOwners(existingNode, added)
p.removeDependentFromOwners(existingNode, removed)
case event.eventType == deleteEvent:
if !found {
glog.V(6 ).Infof("%v doesn't exist in the graph, this shouldn't happen" , accessor.GetUID())
return
}
p.removeNode(existingNode)
existingNode.dependentsLock.RLock()
defer existingNode.dependentsLock.RUnlock()
if len (existingNode.dependents) > 0 {
p.gc.absentOwnerCache.Add(accessor.GetUID())
}
for dep := range existingNode.dependents {
p.gc.dirtyQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: dep})
}
}
EventProcessingLatency.Observe(sinceInMicroseconds(p.gc.clock, timedItem.StartTime))
}
其他函数 最后来看下shouldOrphanDependents()。
shouldOrphanDependents() shouldOrphanDependents()检查object是否有”orphan” finalizer,如果存在,则返回true。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func shouldOrphanDependents (e *event, accessor meta.Object) bool {
if e.oldObj == nil {
if accessor.GetDeletionTimestamp() == nil {
return false
}
} else {
oldAccessor, err := meta.Accessor(e.oldObj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("cannot access oldObj: %v" , err))
return false
}
if accessor.GetDeletionTimestamp() == nil || oldAccessor.GetDeletionTimestamp() != nil {
return false
}
}
finalizers := accessor.GetFinalizers()
for _, finalizer := range finalizers {
if finalizer == api.FinalizerOrphan {
return true
}
}
return false
}
总结 整个garbagecollector可以看成是三个channel的交互: eventQueue:用于存放资源的事件。 生产者:monitor 消费者:Progator的processEvent()
orphanQueue:用于存放需要需要”orphan”操作(即解除联级关系)的object。 生产者:Progator的processEvent() 消费者:GarbageCollector的orphanFinalizer()
dirtyQueue: 用于存放需要owner检查(所有owner都不存在,则删除,即联级删除)的object。 生产者:Progator的insertNode(),在processEvent()中有调用 消费者:GarbageCollector的worker
那么,Pod是如何和ReplicationController关联起来的呢,在replication_controller上会把ReplicationController的信息作为ownerReference加入到Pod中。而整个GarbageCollector也就是通过object的ownerReference来维护整个联级关系表的。