什么是NamespaceController

NamespaceController负责Namespace的删除。我们知道,当我们删除Namespace的时候,Apiserver并不会删除Namespace,仅仅设置了deletionTimestamp字段。所以,整个Namespace的删除工作由NamespaceController完成。NamespaceController会轮询并处理每一个Namespace。

NamespaceController定义

NamespaceController定义在/pkg/controller/namespace/namespace_controller.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// NamespaceController is responsible for performing actions dependent upon a namespace phase
type NamespaceController struct {
// client that purges namespace content, must have list/delete privileges on all content
kubeClient clientset.Interface
// clientPool manages a pool of dynamic clients
clientPool dynamic.ClientPool
// store that holds the namespaces
store cache.Store
// controller that observes the namespaces
controller *cache.Controller
// namespaces that have been queued up for processing by workers
queue workqueue.RateLimitingInterface
// function to list of preferred group versions and their corresponding resource set for namespace deletion
groupVersionResourcesFn func() ([]unversioned.GroupVersionResource, error)
// opCache is a cache to remember if a particular operation is not supported to aid dynamic client.
opCache *operationNotSupportedCache
// finalizerToken is the finalizer token managed by this controller
finalizerToken api.FinalizerName
}

各字段涵义如下:
kubeClient: 给controller用;
clientPool: 用来生成dynamicClient;
store: 存储namespace用,namespace的informer的store返回值;
controller: namespace的informer的controller返回值;
queue: 待处理的namespace队列;

再来看下NamespaceController的生成函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// NewNamespaceController creates a new NamespaceController
func NewNamespaceController(
kubeClient clientset.Interface,
clientPool dynamic.ClientPool,
groupVersionResourcesFn func() ([]unversioned.GroupVersionResource, error),
resyncPeriod time.Duration,
finalizerToken api.FinalizerName) *NamespaceController {
// the namespace deletion code looks at the discovery document to enumerate the set of resources on the server.
// it then finds all namespaced resources, and in response to namespace deletion, will call delete on all of them.
// unfortunately, the discovery information does not include the list of supported verbs/methods. if the namespace
// controller calls LIST/DELETECOLLECTION for a resource, it will get a 405 error from the server and cache that that was the case.
// we found in practice though that some auth engines when encountering paths they don't know about may return a 50x.
// until we have verbs, we pre-populate resources that do not support list or delete for well-known apis rather than
// probing the server once in order to be told no.
opCache := &operationNotSupportedCache{
m: make(map[operationKey]bool),
}
ignoredGroupVersionResources := []unversioned.GroupVersionResource{
{Group: "", Version: "v1", Resource: "bindings"},
}
for _, ignoredGroupVersionResource := range ignoredGroupVersionResources {
opCache.setNotSupported(operationKey{op: operationDeleteCollection, gvr: ignoredGroupVersionResource})
opCache.setNotSupported(operationKey{op: operationList, gvr: ignoredGroupVersionResource})
}
// create the controller so we can inject the enqueue function
//***生成新的NamespaceController***//
namespaceController := &NamespaceController{
kubeClient: kubeClient,
clientPool: clientPool,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "namespace"),
groupVersionResourcesFn: groupVersionResourcesFn,
opCache: opCache,
finalizerToken: finalizerToken,
}
if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("namespace_controller", kubeClient.Core().RESTClient().GetRateLimiter())
}
// configure the backing store/controller
//***生成namespace store和namespace controller***//
store, controller := cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return kubeClient.Core().Namespaces().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return kubeClient.Core().Namespaces().Watch(options)
},
},
&api.Namespace{},
resyncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
namespace := obj.(*api.Namespace)
namespaceController.enqueueNamespace(namespace)
},
UpdateFunc: func(oldObj, newObj interface{}) {
namespace := newObj.(*api.Namespace)
namespaceController.enqueueNamespace(namespace)
},
},
)
namespaceController.store = store
namespaceController.controller = controller
return namespaceController
}

生成函数比较简单,使用enqueueNamespace()方法作为Informer的处理函数,即Informer中controller获取到的item在存储到store中的同时,也会调用enqueuenamespace()来处理。关于Informer,详见https://fankangbest.github.io/2017/08/25/cache%E8%A7%A3%E8%AF%BB(%E4%B8%89)-Controller-v1-5-2/

queue的生成者

queue的生产者为enqueueNamespace(),定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
//***把namespace加入到queue中***//
//***作为namespace informer的处理函数***//
func (nm *NamespaceController) enqueueNamespace(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}
// delay processing namespace events to allow HA api servers to observe namespace deletion,
// and HA etcd servers to observe last minute object creations inside the namespace
nm.queue.AddAfter(key, namespaceDeletionGracePeriod)
}

可以看出,enqueueNamespace()会把namespace的key放入到queue中。

queue的消费者

queue由worker()消费。worker()定义在/pkg/controller/namespace/namespace_controller.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// worker processes the queue of namespace objects.
// Each namespace can be in the queue at most once.
// The system ensures that no two workers can process
// the same namespace at the same time.
func (nm *NamespaceController) worker() {
workFunc := func() bool {
//***从queue中获取数据***//
key, quit := nm.queue.Get()
if quit {
return true
}
defer nm.queue.Done(key)
//***处理namespace***//
err := nm.syncNamespaceFromKey(key.(string))
if err == nil {
// no error, forget this entry and return
nm.queue.Forget(key)
return false
}
if estimate, ok := err.(*contentRemainingError); ok {
t := estimate.Estimate/2 + 1
glog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", key, t)
nm.queue.AddAfter(key, time.Duration(t)*time.Second)
} else {
// rather than wait for a full resync, re-add the namespace to the queue to be processed
nm.queue.AddRateLimited(key)
utilruntime.HandleError(err)
}
return false
}
for {
quit := workFunc()
if quit {
return
}
}
}

worker()会从queue中获取一个key,然后使用syncNamespaceFromKey()对该key所对应的Namespace进行处理。

先来看下worker()是如何启动的。

Run()

Run()负责启动NamespaceController的controller和worker。

1
2
3
4
5
6
7
8
9
10
11
12
13
// Run starts observing the system with the specified number of workers.
func (nm *NamespaceController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
//***启动controller***//
go nm.controller.Run(stopCh)
//***启动workers***//
for i := 0; i < workers; i++ {
go wait.Until(nm.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down NamespaceController")
nm.queue.ShutDown()
}

再来看Namespace的处理函数syncNamespaceFromKey()。

syncNamespaceFromKey()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// syncNamespaceFromKey looks for a namespace with the specified key in its store and synchronizes it
func (nm *NamespaceController) syncNamespaceFromKey(key string) (err error) {
startTime := time.Now()
defer glog.V(4).Infof("Finished syncing namespace %q (%v)", key, time.Now().Sub(startTime))
//***通过key获取具体namespace***//
obj, exists, err := nm.store.GetByKey(key)
if !exists {
glog.Infof("Namespace has been deleted %v", key)
return nil
}
if err != nil {
glog.Errorf("Unable to retrieve namespace %v from store: %v", key, err)
nm.queue.Add(key)
return err
}
namespace := obj.(*api.Namespace)
//***调用syncNamespace()清除删除的namespace的资源***//
return syncNamespace(nm.kubeClient, nm.clientPool, nm.opCache, nm.groupVersionResourcesFn, namespace, nm.finalizerToken)
}

syncNamespaceFromKey()依据key从store中获取具体的namespace,然后调用syncNamespace()对Namespace进行处理。

syncNamespace()

syncNamespace()定义在/pkg/controller/namespace/namespace_controller_utils.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
//***清理删除的Namespace的资源***//
func syncNamespace(
kubeClient clientset.Interface,
clientPool dynamic.ClientPool,
opCache *operationNotSupportedCache,
groupVersionResourcesFn func() ([]unversioned.GroupVersionResource, error),
namespace *api.Namespace,
finalizerToken api.FinalizerName,
) error {
//***未被标记为删除,则直接返回***//
if namespace.DeletionTimestamp == nil {
return nil
}
// multiple controllers may edit a namespace during termination
// first get the latest state of the namespace before proceeding
// if the namespace was deleted already, don't do anything
//***如果namespace不存在,则直接返回***//
namespace, err := kubeClient.Core().Namespaces().Get(namespace.Name)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
glog.V(5).Infof("namespace controller - syncNamespace - namespace: %s, finalizerToken: %s", namespace.Name, finalizerToken)
// ensure that the status is up to date on the namespace
// if we get a not found error, we assume the namespace is truly gone
namespace, err = retryOnConflictError(kubeClient, namespace, updateNamespaceStatusFunc)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
// the latest view of the namespace asserts that namespace is no longer deleting..
if namespace.DeletionTimestamp.IsZero() {
return nil
}
// if the namespace is already finalized, delete it
//***如果该namespace的kubernetes已经被移除,则直接删除namespace***//
if finalized(namespace) {
var opts *api.DeleteOptions
uid := namespace.UID
if len(uid) > 0 {
opts = &api.DeleteOptions{Preconditions: &api.Preconditions{UID: &uid}}
}
err = kubeClient.Core().Namespaces().Delete(namespace.Name, opts)
if err != nil && !errors.IsNotFound(err) {
return err
}
return nil
}
// there may still be content for us to remove
groupVersionResources, err := groupVersionResourcesFn()
if err != nil {
return err
}
//***删除namespace下的资源***//
estimate, err := deleteAllContent(kubeClient, clientPool, opCache, groupVersionResources, namespace.Name, *namespace.DeletionTimestamp)
if err != nil {
return err
}
if estimate > 0 {
return &contentRemainingError{estimate}
}
// we have removed content, so mark it finalized by us
//***移除namespace的kubernetes标签***//
result, err := retryOnConflictError(kubeClient, namespace, finalizeNamespaceFunc(finalizerToken))
if err != nil {
// in normal practice, this should not be possible, but if a deployment is running
// two controllers to do namespace deletion that share a common finalizer token it's
// possible that a not found could occur since the other controller would have finished the delete.
if errors.IsNotFound(err) {
return nil
}
return err
}
// now check if all finalizers have reported that we delete now
//***删除Namespace***//
if finalized(result) {
err = kubeClient.Core().Namespaces().Delete(namespace.Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}

syncNamespace()的逻辑如下:

  1. 如果Namespace的DeletionTimestamp未设置,则不对该Namespace做任何处理;
  2. 如果Namespace不存在,则不做任何处理;
  3. 检查kubernetes标签,如果该namespace的kubernetes已经被移除,则直接删除namespace;
  4. 删除Namespace下的资源,然后移除Namespace中的kubernetes标签;
  5. 检查kubernetes标签,如果不存在,则删除namespace。

可以看出,只有Namespace中没有kubernetes标签的情况下才会删除Namespace,而只有该Namespace下所有资源都被清除时,才会移除kubernetes标签。这就是finalizer机制。

deleteAllContent()

deleteAllContent()可以删除Namespace下的所有资源,定义在/pkg/controller/namespace/namespace_controller_utils.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// deleteAllContent will use the dynamic client to delete each resource identified in groupVersionResources.
// It returns an estimate of the time remaining before the remaining resources are deleted.
// If estimate > 0, not all resources are guaranteed to be gone.
func deleteAllContent(
kubeClient clientset.Interface,
clientPool dynamic.ClientPool,
opCache *operationNotSupportedCache,
groupVersionResources []unversioned.GroupVersionResource,
namespace string,
namespaceDeletedAt unversioned.Time,
) (int64, error) {
estimate := int64(0)
glog.V(4).Infof("namespace controller - deleteAllContent - namespace: %s, gvrs: %v", namespace, groupVersionResources)
// iterate over each group version, and attempt to delete all of its resources
// we sort resources to delete in a priority order that deletes pods LAST
sort.Sort(sortableGroupVersionResources(groupVersionResources))
//***循环处理每一种namespace下的资源***//
for _, gvr := range groupVersionResources {
//***调用deleteAllContentForGroupVersionResource()删除namespace具体资源类***//
gvrEstimate, err := deleteAllContentForGroupVersionResource(kubeClient, clientPool, opCache, gvr, namespace, namespaceDeletedAt)
if err != nil {
return estimate, err
}
if gvrEstimate > estimate {
estimate = gvrEstimate
}
}
glog.V(4).Infof("namespace controller - deleteAllContent - namespace: %s, estimate: %v", namespace, estimate)
return estimate, nil
}
func deleteAllContentForGroupVersionResource(
kubeClient clientset.Interface,
clientPool dynamic.ClientPool,
opCache *operationNotSupportedCache,
gvr unversioned.GroupVersionResource,
namespace string,
namespaceDeletedAt unversioned.Time,
) (int64, error) {
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - namespace: %s, gvr: %v", namespace, gvr)
// estimate how long it will take for the resource to be deleted (needed for objects that support graceful delete)
estimate, err := estimateGracefulTermination(kubeClient, gvr, namespace, namespaceDeletedAt)
if err != nil {
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - unable to estimate - namespace: %s, gvr: %v, err: %v", namespace, gvr, err)
return estimate, err
}
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - estimate - namespace: %s, gvr: %v, estimate: %v", namespace, gvr, estimate)
// get a client for this group version...
dynamicClient, err := clientPool.ClientForGroupVersionResource(gvr)
if err != nil {
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - unable to get client - namespace: %s, gvr: %v, err: %v", namespace, gvr, err)
return estimate, err
}
// first try to delete the entire collection
deleteCollectionSupported, err := deleteCollection(dynamicClient, opCache, gvr, namespace)
if err != nil {
return estimate, err
}
// delete collection was not supported, so we list and delete each item...
if !deleteCollectionSupported {
err = deleteEachItem(dynamicClient, opCache, gvr, namespace)
if err != nil {
return estimate, err
}
}
// verify there are no more remaining items
// it is not an error condition for there to be remaining items if local estimate is non-zero
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - checking for no more items in namespace: %s, gvr: %v", namespace, gvr)
unstructuredList, listSupported, err := listCollection(dynamicClient, opCache, gvr, namespace)
if err != nil {
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - error verifying no items in namespace: %s, gvr: %v, err: %v", namespace, gvr, err)
return estimate, err
}
if !listSupported {
return estimate, nil
}
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - items remaining - namespace: %s, gvr: %v, items: %v", namespace, gvr, len(unstructuredList.Items))
if len(unstructuredList.Items) != 0 && estimate == int64(0) {
// if any item has a finalizer, we treat that as a normal condition, and use a default estimation to allow for GC to complete.
for _, item := range unstructuredList.Items {
if len(item.GetFinalizers()) > 0 {
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - items remaining with finalizers - namespace: %s, gvr: %v, finalizers: %v", namespace, gvr, item.GetFinalizers())
return finalizerEstimateSeconds, nil
}
}
// nothing reported a finalizer, so something was unexpected as it should have been deleted.
return estimate, fmt.Errorf("unexpected items still remain in namespace: %s for gvr: %v", namespace, gvr)
}
return estimate, nil
}

deleteAllContent()会按资源清空整个Namespace。而Namespace下有多少种资源是通过discoveryClient的ServerPreferredNamespacedResources()获取的,discoveryClient还有待详细研究。

分析完毕。