garbagecollector的功能主要有以下三点:

  1. 执行联级删除时(资源删除由apiserver完成),回收子资源;
  2. 执行非联级删除时,删除子资源的联级关系,再回收资源;
  3. 维护有资源之间的联级关系表。

为了更好地说明garbagecollector的机制,将以replicationController和Pods为例,在Kubernetes v1.5.2中,可以使用
curl -XDELETE http://kubernetes:8080/api/v1/namespaces/default/replicationcontrollers/ubuntu?orphanDependents=false
执行联级删除,即删除replicationController时,后台会自动回收Pods。
具体见”finalizer机制”的分析。

garbagecollector

先来看下garbagecollector的定义,在/pkg/controller/garbagecollector/garbagecollector.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// GarbageCollector is responsible for carrying out cascading deletion, and
// removing ownerReferences from the dependents if the owner is deleted with
// DeleteOptions.OrphanDependents=true.
type GarbageCollector struct {
restMapper meta.RESTMapper
// metaOnlyClientPool uses a special codec, which removes fields except for
// apiVersion, kind, and metadata during decoding.
metaOnlyClientPool dynamic.ClientPool
// clientPool uses the regular dynamicCodec. We need it to update
// finalizers. It can be removed if we support patching finalizers.
clientPool dynamic.ClientPool
dirtyQueue *workqueue.TimedWorkQueue
orphanQueue *workqueue.TimedWorkQueue
monitors []monitor
propagator *Propagator
clock clock.Clock
registeredRateLimiter *RegisteredRateLimiter
registeredRateLimiterForMonitors *RegisteredRateLimiter
// GC caches the owners that do not exist according to the API server.
absentOwnerCache *UIDCache
}

其中,主要字段的含义为:

  • dirtyQueue: 如果某object的owner不存在,则把该object加入到dirtyQueue进行检查,如果该object所有的owner都不存在,则把object删除;
  • orphanQueue: 如果某object被标记为”orphan”,则把object的子资源的联级关系删除,然后再把object删除;
  • monitors: 对某种资源变化的监控;
  • propagator: 联级关系维护者。

NewGarbageCollector()

NewGarbageCollector()可以生成一个新的GarbageCollector。在GarbageCollector中,会对某类资源生成一个monitor对其变化进行监控。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynamic.ClientPool, mapper meta.RESTMapper, resources []unversioned.GroupVersionResource) (*GarbageCollector, error) {
gc := &GarbageCollector{
metaOnlyClientPool: metaOnlyClientPool,
clientPool: clientPool,
restMapper: mapper,
clock: clock.RealClock{},
dirtyQueue: workqueue.NewTimedWorkQueue(),
orphanQueue: workqueue.NewTimedWorkQueue(),
registeredRateLimiter: NewRegisteredRateLimiter(resources),
registeredRateLimiterForMonitors: NewRegisteredRateLimiter(resources),
absentOwnerCache: NewUIDCache(500),
}
gc.propagator = &Propagator{
eventQueue: workqueue.NewTimedWorkQueue(),
uidToNode: &concurrentUIDToNode{
RWMutex: &sync.RWMutex{},
uidToNode: make(map[types.UID]*node),
},
gc: gc,
}
for _, resource := range resources {
if _, ok := ignoredResources[resource]; ok {
glog.V(6).Infof("ignore resource %#v", resource)
continue
}
kind, err := gc.restMapper.KindFor(resource)
if err != nil {
if _, ok := err.(*meta.NoResourceMatchError); ok {
// ignore NoResourceMatchErrors for now because TPRs won't be registered
// and hence the RestMapper does not know about them. The deletableResources
// though are using discovery which included TPRs.
// TODO: use dynamic discovery for RestMapper and deletableResources
glog.Warningf("ignore NoResourceMatchError for %v", resource)
continue
}
return nil, err
}
monitor, err := gc.monitorFor(resource, kind)
if err != nil {
return nil, err
}
gc.monitors = append(gc.monitors, monitor)
}
return gc, nil
}

Run()

Run()可以启动GarbageCollector。主要流程是:

  1. 启动monitors中的各monitor;
  2. 启动propagator.processEvent();
  3. 启动i个worker及i个orphanFinalizer;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) {
glog.Infof("Garbage Collector: Initializing")
//***启动各种资源的monitor,以监听资源事件***//
//***把事件放入eventQueue***//
for _, monitor := range gc.monitors {
go monitor.controller.Run(stopCh)
}
wait.PollInfinite(10*time.Second, func() (bool, error) {
for _, monitor := range gc.monitors {
if !monitor.controller.HasSynced() {
glog.Infof("Garbage Collector: Waiting for resource monitors to be synced...")
return false, nil
}
}
return true, nil
})
glog.Infof("Garbage Collector: All monitored resources synced. Proceeding to collect garbage")
// worker
//***启动processEvent***//
//***从eventQueue中获取item,并存入orphanQueue中***//
go wait.Until(gc.propagator.processEvent, 0, stopCh)
//***启动workers个worker***//
for i := 0; i < workers; i++ {
//***检查owner存不存在,如果不存在,则回该资源***//
go wait.Until(gc.worker, 0, stopCh)
//***从orphanQueue中获取item,并执行联级删除***//
go wait.Until(gc.orphanFinalizer, 0, stopCh)
}
Register()
<-stopCh
glog.Infof("Garbage Collector: Shutting down")
gc.dirtyQueue.ShutDown()
gc.orphanQueue.ShutDown()
gc.propagator.eventQueue.ShutDown()
}

monitorFor()

monitorFor()可以生成一个monitor:
monitor的定义如下:

1
2
3
4
type monitor struct {
store cache.Store
controller *cache.Controller
}

monitorFor()方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
//***监视某resource的事件,并加入到eventQueue中***//
func (gc *GarbageCollector) monitorFor(resource unversioned.GroupVersionResource, kind unversioned.GroupVersionKind) (monitor, error) {
// TODO: consider store in one storage.
glog.V(6).Infof("create storage for resource %s", resource)
var monitor monitor
client, err := gc.metaOnlyClientPool.ClientForGroupVersionKind(kind)
if err != nil {
return monitor, err
}
gc.registeredRateLimiterForMonitors.registerIfNotPresent(resource.GroupVersion(), client, "garbage_collector_monitoring")
setObjectTypeMeta := func(obj interface{}) {
runtimeObject, ok := obj.(runtime.Object)
if !ok {
utilruntime.HandleError(fmt.Errorf("expected runtime.Object, got %#v", obj))
}
runtimeObject.GetObjectKind().SetGroupVersionKind(kind)
}
monitor.store, monitor.controller = cache.NewInformer(
gcListWatcher(client, resource),
nil,
ResourceResyncTime,
cache.ResourceEventHandlerFuncs{
// add the event to the propagator's eventQueue.
AddFunc: func(obj interface{}) {
setObjectTypeMeta(obj)
event := &event{
eventType: addEvent,
obj: obj,
}
gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event})
},
UpdateFunc: func(oldObj, newObj interface{}) {
setObjectTypeMeta(newObj)
event := &event{updateEvent, newObj, oldObj}
gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event})
},
DeleteFunc: func(obj interface{}) {
// delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it
if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = deletedFinalStateUnknown.Obj
}
setObjectTypeMeta(obj)
event := &event{
eventType: deleteEvent,
obj: obj,
}
gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event})
},
},
)
return monitor, nil
}

可以看到,monitor中的store和controller就是对应informer返回的store和controller。controller的handler就是把事件加入到gc.propagator.eventQueue。现在,我们找到了eventQueue的生产者,即monitor。

worker()

worker()的作用就是消费dirtyQueue中的object,然后调用processItem()对object进行处理。
processItem()会检查object的owner,如果所有owner都不存在,则把object删除,以完成联级删除。
现在我们有了dirtyQueue的消费者,即worker。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
func (gc *GarbageCollector) worker() {
timedItem, quit := gc.dirtyQueue.Get()
if quit {
return
}
defer gc.dirtyQueue.Done(timedItem)
//***调用processItem()处理item***//
err := gc.processItem(timedItem.Object.(*node))
if err != nil {
utilruntime.HandleError(fmt.Errorf("Error syncing item %#v: %v", timedItem.Object, err))
// retry if garbage collection of an object failed.
gc.dirtyQueue.Add(timedItem)
return
}
DirtyProcessingLatency.Observe(sinceInMicroseconds(gc.clock, timedItem.StartTime))
}
//***检查item对应的owner是否存在,如果都不存在,则把item删除***//
func (gc *GarbageCollector) processItem(item *node) error {
// Get the latest item from the API server
latest, err := gc.getObject(item.identity)
if err != nil {
if errors.IsNotFound(err) {
// the Propagator can add "virtual" node for an owner that doesn't
// exist yet, so we need to enqueue a virtual Delete event to remove
// the virtual node from Propagator.uidToNode.
glog.V(6).Infof("item %v not found, generating a virtual delete event", item.identity)
event := &event{
eventType: deleteEvent,
obj: objectReferenceToMetadataOnlyObject(item.identity),
}
glog.V(6).Infof("generating virtual delete event for %s\n\n", event.obj)
gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event})
return nil
}
return err
}
if latest.GetUID() != item.identity.UID {
glog.V(6).Infof("UID doesn't match, item %v not found, generating a virtual delete event", item.identity)
event := &event{
eventType: deleteEvent,
obj: objectReferenceToMetadataOnlyObject(item.identity),
}
glog.V(6).Infof("generating virtual delete event for %s\n\n", event.obj)
gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event})
return nil
}
//***获取ownerReferences***//
ownerReferences := latest.GetOwnerReferences()
if len(ownerReferences) == 0 {
glog.V(6).Infof("object %s's doesn't have an owner, continue on next item", item.identity)
return nil
}
// TODO: we need to remove dangling references if the object is not to be
// deleted.
for _, reference := range ownerReferences {
if gc.absentOwnerCache.Has(reference.UID) {
glog.V(6).Infof("according to the absentOwnerCache, object %s's owner %s/%s, %s does not exist", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
continue
}
// TODO: we need to verify the reference resource is supported by the
// system. If it's not a valid resource, the garbage collector should i)
// ignore the reference when decide if the object should be deleted, and
// ii) should update the object to remove such references. This is to
// prevent objects having references to an old resource from being
// deleted during a cluster upgrade.
fqKind := unversioned.FromAPIVersionAndKind(reference.APIVersion, reference.Kind)
client, err := gc.clientPool.ClientForGroupVersionKind(fqKind)
if err != nil {
return err
}
resource, err := gc.apiResource(reference.APIVersion, reference.Kind, len(item.identity.Namespace) != 0)
if err != nil {
return err
}
//***获取资源的owner***//
//***如果能获取到owner,则该资源不删除***//
owner, err := client.Resource(resource, item.identity.Namespace).Get(reference.Name)
if err == nil {
if owner.GetUID() != reference.UID {
glog.V(6).Infof("object %s's owner %s/%s, %s is not found, UID mismatch", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
gc.absentOwnerCache.Add(reference.UID)
continue
}
glog.V(6).Infof("object %s has at least an existing owner, will not garbage collect", item.identity.UID)
return nil
} else if errors.IsNotFound(err) {
gc.absentOwnerCache.Add(reference.UID)
glog.V(6).Infof("object %s's owner %s/%s, %s is not found", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
} else {
return err
}
}
//***如果所有的owner都不存在,则把资源回收删除***//
glog.V(2).Infof("none of object %s's owners exist any more, will garbage collect it", item.identity)
return gc.deleteObject(item.identity)
}

orphanFinalizer()

orphanFinalizer()会消费orphanQueue中的object,然后调用ophanDependents()删除对应dependents的联级关系,最后调用removeOrphanFinalizer()移除该object的”orphan” finalizer,并使用Update()删除该object。关于Update()删除功能,可以看/registry/generic包中的相关代码。
orphanDependents()会把dependent中的ownerReference中对应的owner清除。orphanDependents()是如何用patch实现删除list中的一个值的,还有待详细研究。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
//***执行普通删除***//
//***先清理dependents中的联级关系,再通过Update删除资源***//
func (gc *GarbageCollector) orphanFinalizer() {
timedItem, quit := gc.orphanQueue.Get()
if quit {
return
}
defer gc.orphanQueue.Done(timedItem)
owner, ok := timedItem.Object.(*node)
if !ok {
utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", timedItem.Object))
}
// we don't need to lock each element, because they never get updated
//***获取owner的dependents列表***//
owner.dependentsLock.RLock()
dependents := make([]*node, 0, len(owner.dependents))
for dependent := range owner.dependents {
dependents = append(dependents, dependent)
}
owner.dependentsLock.RUnlock()
err := gc.orhpanDependents(owner.identity, dependents)
if err != nil {
glog.V(6).Infof("orphanDependents for %s failed with %v", owner.identity, err)
gc.orphanQueue.Add(timedItem)
return
}
// update the owner, remove "orphaningFinalizer" from its finalizers list
//***删除orphaningFinalizer***//
err = gc.removeOrphanFinalizer(owner)
if err != nil {
glog.V(6).Infof("removeOrphanFinalizer for %s failed with %v", owner.identity, err)
gc.orphanQueue.Add(timedItem)
}
OrphanProcessingLatency.Observe(sinceInMicroseconds(gc.clock, timedItem.StartTime))
}
//***清理dependents的ownerReference***//
//***此处orphanDependents写错了***//
func (gc *GarbageCollector) orhpanDependents(owner objectReference, dependents []*node) error {
var failedDependents []objectReference
var errorsSlice []error
for _, dependent := range dependents {
// the dependent.identity.UID is used as precondition
deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, owner.UID, dependent.identity.UID)
_, err := gc.patchObject(dependent.identity, []byte(deleteOwnerRefPatch))
// note that if the target ownerReference doesn't exist in the
// dependent, strategic merge patch will NOT return an error.
if err != nil && !errors.IsNotFound(err) {
errorsSlice = append(errorsSlice, fmt.Errorf("orphaning %s failed with %v", dependent.identity, err))
}
}
if len(failedDependents) != 0 {
return fmt.Errorf("failed to orphan dependents of owner %s, got errors: %s", owner, utilerrors.NewAggregate(errorsSlice).Error())
}
glog.V(6).Infof("successfully updated all dependents")
return nil
}

removeOrphanFinalizer()

removeOrphanFinalizer()可以删除”orphan” finalizer,然后更新该object,注意更新操作在某些条件是删除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
//***移除"orphan" finalizer***//
func (gc *GarbageCollector) removeOrphanFinalizer(owner *node) error {
const retries = 5
for count := 0; count < retries; count++ {
ownerObject, err := gc.getObject(owner.identity)
if err != nil {
return fmt.Errorf("cannot finalize owner %s, because cannot get it. The garbage collector will retry later.", owner.identity)
}
accessor, err := meta.Accessor(ownerObject)
if err != nil {
return fmt.Errorf("cannot access the owner object: %v. The garbage collector will retry later.", err)
}
finalizers := accessor.GetFinalizers()
var newFinalizers []string
found := false
for _, f := range finalizers {
if f == api.FinalizerOrphan {
found = true
break
} else {
newFinalizers = append(newFinalizers, f)
}
}
if !found {
glog.V(6).Infof("the orphan finalizer is already removed from object %s", owner.identity)
return nil
}
// remove the owner from dependent's OwnerReferences
ownerObject.SetFinalizers(newFinalizers)
//***更新的时候在一定条件下会删除资源***//
_, err = gc.updateObject(owner.identity, ownerObject)
if err == nil {
return nil
}
if err != nil && !errors.IsConflict(err) {
return fmt.Errorf("cannot update the finalizers of owner %s, with error: %v, tried %d times", owner.identity, err, count+1)
}
// retry if it's a conflict
glog.V(6).Infof("got conflict updating the owner object %s, tried %d times", owner.identity, count+1)
}
return fmt.Errorf("updateMaxRetries(%d) has reached. The garbage collector will retry later for owner %v.", retries, owner.identity)
}

接下来,我们来看下联级关系维护者-Propagator。

Propagator

Propagator维护结构体值间的联级关系,定义在/pkg/controller/garbagecollector/garbagecollector.go中:

1
2
3
4
5
6
7
type Propagator struct {
eventQueue *workqueue.TimedWorkQueue
// uidToNode doesn't require a lock to protect, because only the
// single-threaded Propagator.processEvent() reads/writes it.
uidToNode *concurrentUIDToNode
gc *GarbageCollector
}

结构体间的联级关系是通过node的概念来维护的,node定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// node does not require a lock to protect. The single-threaded
// Propagator.processEvent() is the sole writer of the nodes. The multi-threaded
// GarbageCollector.processItem() reads the nodes, but it only reads the fields
// that never get changed by Propagator.processEvent().
type node struct {
identity objectReference
// dependents will be read by the orphan() routine, we need to protect it with a lock.
dependentsLock sync.RWMutex
dependents map[*node]struct{}
// When processing an Update event, we need to compare the updated
// ownerReferences with the owners recorded in the graph.
owners []metatypes.OwnerReference
}

可以看出,node的dependents字段表示该node的dependents;owners字段表示该node的owners。

insertNode()

insertNode()可以更新node及其联级关系。
insertNode()会调用addDependentToOwners()。addDependentToOwners()会检查该node的owner是否存在,如有不存在的情况,则把该node加入到dirtyQueue中供worker消费。所以dirtyQueue的生产者是insertNode(),消费者是worker()。insertNode()在

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//***把资源加入到owners的依赖表中***//
func (p *Propagator) insertNode(n *node) {
p.uidToNode.Write(n)
p.addDependentToOwners(n, n.owners)
}
//***把资源加到owner的dependents list中***//
func (p *Propagator) addDependentToOwners(n *node, owners []metatypes.OwnerReference) {
for _, owner := range owners {
ownerNode, ok := p.uidToNode.Read(owner.UID)
if !ok {
// Create a "virtual" node in the graph for the owner if it doesn't
// exist in the graph yet. Then enqueue the virtual node into the
// dirtyQueue. The garbage processor will enqueue a virtual delete
// event to delete it from the graph if API server confirms this
// owner doesn't exist.
ownerNode = &node{
identity: objectReference{
OwnerReference: owner,
Namespace: n.identity.Namespace,
},
dependents: make(map[*node]struct{}),
}
glog.V(6).Infof("add virtual node.identity: %s\n\n", ownerNode.identity)
p.uidToNode.Write(ownerNode)
//***如果有owner不存在,则把node加入到dirtyQueue做检查***//
//***因为该node可能具备回收的条件***//
p.gc.dirtyQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: ownerNode})
}
ownerNode.addDependent(n)
}
}

processEvent()

processEvent()从eventQueue中取得item并进行处理。处理的过程主要是维护联级关系及在适当的时候调用了insertNode和把object加入到orphanQueue中。所以,processEvent()是orphanQueue的生产者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
//***从eventQueue中获取item并处理***//
func (p *Propagator) processEvent() {
timedItem, quit := p.eventQueue.Get()
if quit {
return
}
defer p.eventQueue.Done(timedItem)
event, ok := timedItem.Object.(*event)
if !ok {
utilruntime.HandleError(fmt.Errorf("expect a *event, got %v", timedItem.Object))
return
}
obj := event.obj
accessor, err := meta.Accessor(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err))
return
}
typeAccessor, err := meta.TypeAccessor(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err))
return
}
glog.V(6).Infof("Propagator process object: %s/%s, namespace %s, name %s, event type %s", typeAccessor.GetAPIVersion(), typeAccessor.GetKind(), accessor.GetNamespace(), accessor.GetName(), event.eventType)
// Check if the node already exsits
existingNode, found := p.uidToNode.Read(accessor.GetUID())
switch {
case (event.eventType == addEvent || event.eventType == updateEvent) && !found:
newNode := &node{
identity: objectReference{
OwnerReference: metatypes.OwnerReference{
APIVersion: typeAccessor.GetAPIVersion(),
Kind: typeAccessor.GetKind(),
UID: accessor.GetUID(),
Name: accessor.GetName(),
},
Namespace: accessor.GetNamespace(),
},
dependents: make(map[*node]struct{}),
owners: accessor.GetOwnerReferences(),
}
p.insertNode(newNode)
// the underlying delta_fifo may combine a creation and deletion into one event
if shouldOrphanDependents(event, accessor) {
glog.V(6).Infof("add %s to the orphanQueue", newNode.identity)
//***把object加入到orphanQueue中***//
p.gc.orphanQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: newNode})
}
case (event.eventType == addEvent || event.eventType == updateEvent) && found:
// caveat: if GC observes the creation of the dependents later than the
// deletion of the owner, then the orphaning finalizer won't be effective.
if shouldOrphanDependents(event, accessor) {
glog.V(6).Infof("add %s to the orphanQueue", existingNode.identity)
p.gc.orphanQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: existingNode})
}
// add/remove owner refs
added, removed := referencesDiffs(existingNode.owners, accessor.GetOwnerReferences())
if len(added) == 0 && len(removed) == 0 {
glog.V(6).Infof("The updateEvent %#v doesn't change node references, ignore", event)
return
}
// update the node itself
existingNode.owners = accessor.GetOwnerReferences()
// Add the node to its new owners' dependent lists.
p.addDependentToOwners(existingNode, added)
// remove the node from the dependent list of node that are no long in
// the node's owners list.
p.removeDependentFromOwners(existingNode, removed)
case event.eventType == deleteEvent:
if !found {
glog.V(6).Infof("%v doesn't exist in the graph, this shouldn't happen", accessor.GetUID())
return
}
p.removeNode(existingNode)
existingNode.dependentsLock.RLock()
defer existingNode.dependentsLock.RUnlock()
if len(existingNode.dependents) > 0 {
p.gc.absentOwnerCache.Add(accessor.GetUID())
}
for dep := range existingNode.dependents {
p.gc.dirtyQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: dep})
}
}
EventProcessingLatency.Observe(sinceInMicroseconds(p.gc.clock, timedItem.StartTime))
}

其他函数

最后来看下shouldOrphanDependents()。

shouldOrphanDependents()

shouldOrphanDependents()检查object是否有”orphan” finalizer,如果存在,则返回true。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func shouldOrphanDependents(e *event, accessor meta.Object) bool {
// The delta_fifo may combine the creation and update of the object into one
// event, so we need to check AddEvent as well.
if e.oldObj == nil {
if accessor.GetDeletionTimestamp() == nil {
return false
}
} else {
oldAccessor, err := meta.Accessor(e.oldObj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("cannot access oldObj: %v", err))
return false
}
// ignore the event if it's not updating DeletionTimestamp from non-nil to nil.
if accessor.GetDeletionTimestamp() == nil || oldAccessor.GetDeletionTimestamp() != nil {
return false
}
}
//***如果存在"orphan" finalizer,则返回true***//
finalizers := accessor.GetFinalizers()
for _, finalizer := range finalizers {
if finalizer == api.FinalizerOrphan {
return true
}
}
return false
}

总结

整个garbagecollector可以看成是三个channel的交互:
eventQueue:用于存放资源的事件。
生产者:monitor
消费者:Progator的processEvent()

orphanQueue:用于存放需要需要”orphan”操作(即解除联级关系)的object。
生产者:Progator的processEvent()
消费者:GarbageCollector的orphanFinalizer()

dirtyQueue: 用于存放需要owner检查(所有owner都不存在,则删除,即联级删除)的object。
生产者:Progator的insertNode(),在processEvent()中有调用
消费者:GarbageCollector的worker

那么,Pod是如何和ReplicationController关联起来的呢,在replication_controller上会把ReplicationController的信息作为ownerReference加入到Pod中。而整个GarbageCollector也就是通过object的ownerReference来维护整个联级关系表的。