什么是NamespaceController NamespaceController负责Namespace的删除。我们知道,当我们删除Namespace的时候,Apiserver并不会删除Namespace,仅仅设置了deletionTimestamp字段。所以,整个Namespace的删除工作由NamespaceController完成。NamespaceController会轮询并处理每一个Namespace。
NamespaceController定义 NamespaceController定义在/pkg/controller/namespace/namespace_controller.go中:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type NamespaceController struct {
kubeClient clientset.Interface
clientPool dynamic.ClientPool
store cache.Store
controller *cache.Controller
queue workqueue.RateLimitingInterface
groupVersionResourcesFn func () ([]unversioned.GroupVersionResource, error)
// opCache is a cache to remember if a particular operation is not supported to aid dynamic client .
opCache *operationNotSupportedCache
// finalizerToken is the finalizer token managed by this controller
finalizerToken api .FinalizerName
}
各字段涵义如下: kubeClient: 给controller用; clientPool: 用来生成dynamicClient; store: 存储namespace用,namespace的informer的store返回值; controller: namespace的informer的controller返回值; queue: 待处理的namespace队列;
再来看下NamespaceController的生成函数:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
func NewNamespaceController (
kubeClient clientset.Interface,
clientPool dynamic.ClientPool,
groupVersionResourcesFn func () ([]unversioned.GroupVersionResource, error) ,
resyncPeriod time .Duration ,
finalizerToken api .FinalizerName ) *NamespaceController {
opCache := &operationNotSupportedCache{
m: make (map [operationKey]bool ),
}
ignoredGroupVersionResources := []unversioned.GroupVersionResource{
{Group: "" , Version: "v1" , Resource: "bindings" },
}
for _, ignoredGroupVersionResource := range ignoredGroupVersionResources {
opCache.setNotSupported(operationKey{op: operationDeleteCollection, gvr: ignoredGroupVersionResource})
opCache.setNotSupported(operationKey{op: operationList, gvr: ignoredGroupVersionResource})
}
namespaceController := &NamespaceController{
kubeClient: kubeClient,
clientPool: clientPool,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "namespace" ),
groupVersionResourcesFn: groupVersionResourcesFn,
opCache: opCache,
finalizerToken: finalizerToken,
}
if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("namespace_controller" , kubeClient.Core().RESTClient().GetRateLimiter())
}
store, controller := cache.NewInformer(
&cache.ListWatch{
ListFunc: func (options api.ListOptions) (runtime.Object, error) {
return kubeClient.Core().Namespaces().List(options)
},
WatchFunc: func (options api.ListOptions) (watch.Interface, error) {
return kubeClient.Core().Namespaces().Watch(options)
},
},
&api.Namespace{},
resyncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: func (obj interface {}) {
namespace := obj.(*api.Namespace)
namespaceController.enqueueNamespace(namespace)
},
UpdateFunc: func (oldObj, newObj interface {}) {
namespace := newObj.(*api.Namespace)
namespaceController.enqueueNamespace(namespace)
},
},
)
namespaceController.store = store
namespaceController.controller = controller
return namespaceController
}
生成函数比较简单,使用enqueueNamespace()方法作为Informer的处理函数,即Informer中controller获取到的item在存储到store中的同时,也会调用enqueuenamespace()来处理。关于Informer,详见https://fankangbest.github.io/2017/08/25/cache%E8%A7%A3%E8%AF%BB(%E4%B8%89)-Controller-v1-5-2/ 。
queue的生成者 queue的生产者为enqueueNamespace(),定义如下:1
2
3
4
5
6
7
8
9
10
11
12
func (nm *NamespaceController) enqueueNamespace (obj interface {}) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v" , obj, err)
return
}
nm.queue.AddAfter(key, namespaceDeletionGracePeriod)
}
可以看出,enqueueNamespace()会把namespace的key放入到queue中。
queue的消费者 queue由worker()消费。worker()定义在/pkg/controller/namespace/namespace_controller.go中:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (nm *NamespaceController) worker () {
workFunc := func () bool {
key, quit := nm.queue.Get()
if quit {
return true
}
defer nm.queue.Done(key)
err := nm.syncNamespaceFromKey(key.(string ))
if err == nil {
nm.queue.Forget(key)
return false
}
if estimate, ok := err.(*contentRemainingError); ok {
t := estimate.Estimate/2 + 1
glog.V(4 ).Infof("Content remaining in namespace %s, waiting %d seconds" , key, t)
nm.queue.AddAfter(key, time.Duration(t)*time.Second)
} else {
nm.queue.AddRateLimited(key)
utilruntime.HandleError(err)
}
return false
}
for {
quit := workFunc()
if quit {
return
}
}
}
worker()会从queue中获取一个key,然后使用syncNamespaceFromKey()对该key所对应的Namespace进行处理。
先来看下worker()是如何启动的。
Run() Run()负责启动NamespaceController的controller和worker。1
2
3
4
5
6
7
8
9
10
11
12
13
func (nm *NamespaceController) Run (workers int , stopCh <-chan struct {}) {
defer utilruntime.HandleCrash()
go nm.controller.Run(stopCh)
for i := 0 ; i < workers; i++ {
go wait.Until(nm.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down NamespaceController" )
nm.queue.ShutDown()
}
再来看Namespace的处理函数syncNamespaceFromKey()。
syncNamespaceFromKey() 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (nm *NamespaceController) syncNamespaceFromKey (key string ) (err error) {
startTime := time.Now()
defer glog.V(4 ).Infof("Finished syncing namespace %q (%v)" , key, time.Now().Sub(startTime))
obj, exists, err := nm.store.GetByKey(key)
if !exists {
glog.Infof("Namespace has been deleted %v" , key)
return nil
}
if err != nil {
glog.Errorf("Unable to retrieve namespace %v from store: %v" , key, err)
nm.queue.Add(key)
return err
}
namespace := obj.(*api.Namespace)
return syncNamespace(nm.kubeClient, nm.clientPool, nm.opCache, nm.groupVersionResourcesFn, namespace, nm.finalizerToken)
}
syncNamespaceFromKey()依据key从store中获取具体的namespace,然后调用syncNamespace()对Namespace进行处理。
syncNamespace() syncNamespace()定义在/pkg/controller/namespace/namespace_controller_utils.go中:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
func syncNamespace (
kubeClient clientset.Interface,
clientPool dynamic.ClientPool,
opCache *operationNotSupportedCache,
groupVersionResourcesFn func () ([]unversioned.GroupVersionResource, error) ,
namespace *api .Namespace ,
finalizerToken api .FinalizerName ,
) error {
if namespace.DeletionTimestamp == nil {
return nil
}
namespace, err := kubeClient.Core().Namespaces().Get(namespace.Name)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
glog.V(5 ).Infof("namespace controller - syncNamespace - namespace: %s, finalizerToken: %s" , namespace.Name, finalizerToken)
namespace, err = retryOnConflictError(kubeClient, namespace, updateNamespaceStatusFunc)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
if namespace.DeletionTimestamp.IsZero() {
return nil
}
if finalized(namespace) {
var opts *api.DeleteOptions
uid := namespace.UID
if len (uid) > 0 {
opts = &api.DeleteOptions{Preconditions: &api.Preconditions{UID: &uid}}
}
err = kubeClient.Core().Namespaces().Delete(namespace.Name, opts)
if err != nil && !errors.IsNotFound(err) {
return err
}
return nil
}
groupVersionResources, err := groupVersionResourcesFn()
if err != nil {
return err
}
estimate, err := deleteAllContent(kubeClient, clientPool, opCache, groupVersionResources, namespace.Name, *namespace.DeletionTimestamp)
if err != nil {
return err
}
if estimate > 0 {
return &contentRemainingError{estimate}
}
result, err := retryOnConflictError(kubeClient, namespace, finalizeNamespaceFunc(finalizerToken))
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
if finalized(result) {
err = kubeClient.Core().Namespaces().Delete(namespace.Name, nil )
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
syncNamespace()的逻辑如下:
如果Namespace的DeletionTimestamp未设置,则不对该Namespace做任何处理;
如果Namespace不存在,则不做任何处理;
检查kubernetes标签,如果该namespace的kubernetes已经被移除,则直接删除namespace;
删除Namespace下的资源,然后移除Namespace中的kubernetes标签;
检查kubernetes标签,如果不存在,则删除namespace。
可以看出,只有Namespace中没有kubernetes标签的情况下才会删除Namespace,而只有该Namespace下所有资源都被清除时,才会移除kubernetes标签。这就是finalizer机制。
deleteAllContent() deleteAllContent()可以删除Namespace下的所有资源,定义在/pkg/controller/namespace/namespace_controller_utils.go中:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
func deleteAllContent (
kubeClient clientset.Interface,
clientPool dynamic.ClientPool,
opCache *operationNotSupportedCache,
groupVersionResources []unversioned.GroupVersionResource,
namespace string ,
namespaceDeletedAt unversioned.Time,
) (int64 , error) {
estimate := int64 (0 )
glog.V(4 ).Infof("namespace controller - deleteAllContent - namespace: %s, gvrs: %v" , namespace, groupVersionResources)
sort.Sort(sortableGroupVersionResources(groupVersionResources))
for _, gvr := range groupVersionResources {
gvrEstimate, err := deleteAllContentForGroupVersionResource(kubeClient, clientPool, opCache, gvr, namespace, namespaceDeletedAt)
if err != nil {
return estimate, err
}
if gvrEstimate > estimate {
estimate = gvrEstimate
}
}
glog.V(4 ).Infof("namespace controller - deleteAllContent - namespace: %s, estimate: %v" , namespace, estimate)
return estimate, nil
}
func deleteAllContentForGroupVersionResource (
kubeClient clientset.Interface,
clientPool dynamic.ClientPool,
opCache *operationNotSupportedCache,
gvr unversioned.GroupVersionResource,
namespace string ,
namespaceDeletedAt unversioned.Time,
) (int64 , error) {
glog.V(5 ).Infof("namespace controller - deleteAllContentForGroupVersionResource - namespace: %s, gvr: %v" , namespace, gvr)
estimate, err := estimateGracefulTermination(kubeClient, gvr, namespace, namespaceDeletedAt)
if err != nil {
glog.V(5 ).Infof("namespace controller - deleteAllContentForGroupVersionResource - unable to estimate - namespace: %s, gvr: %v, err: %v" , namespace, gvr, err)
return estimate, err
}
glog.V(5 ).Infof("namespace controller - deleteAllContentForGroupVersionResource - estimate - namespace: %s, gvr: %v, estimate: %v" , namespace, gvr, estimate)
dynamicClient, err := clientPool.ClientForGroupVersionResource(gvr)
if err != nil {
glog.V(5 ).Infof("namespace controller - deleteAllContentForGroupVersionResource - unable to get client - namespace: %s, gvr: %v, err: %v" , namespace, gvr, err)
return estimate, err
}
deleteCollectionSupported, err := deleteCollection(dynamicClient, opCache, gvr, namespace)
if err != nil {
return estimate, err
}
if !deleteCollectionSupported {
err = deleteEachItem(dynamicClient, opCache, gvr, namespace)
if err != nil {
return estimate, err
}
}
glog.V(5 ).Infof("namespace controller - deleteAllContentForGroupVersionResource - checking for no more items in namespace: %s, gvr: %v" , namespace, gvr)
unstructuredList, listSupported, err := listCollection(dynamicClient, opCache, gvr, namespace)
if err != nil {
glog.V(5 ).Infof("namespace controller - deleteAllContentForGroupVersionResource - error verifying no items in namespace: %s, gvr: %v, err: %v" , namespace, gvr, err)
return estimate, err
}
if !listSupported {
return estimate, nil
}
glog.V(5 ).Infof("namespace controller - deleteAllContentForGroupVersionResource - items remaining - namespace: %s, gvr: %v, items: %v" , namespace, gvr, len (unstructuredList.Items))
if len (unstructuredList.Items) != 0 && estimate == int64 (0 ) {
for _, item := range unstructuredList.Items {
if len (item.GetFinalizers()) > 0 {
glog.V(5 ).Infof("namespace controller - deleteAllContentForGroupVersionResource - items remaining with finalizers - namespace: %s, gvr: %v, finalizers: %v" , namespace, gvr, item.GetFinalizers())
return finalizerEstimateSeconds, nil
}
}
return estimate, fmt.Errorf("unexpected items still remain in namespace: %s for gvr: %v" , namespace, gvr)
}
return estimate, nil
}
deleteAllContent()会按资源清空整个Namespace。而Namespace下有多少种资源是通过discoveryClient的ServerPreferredNamespacedResources()获取的,discoveryClient还有待详细研究。
分析完毕。