本次分析将介绍ResourceQuotaController。之前我们已经介绍过Kubernetes如何管理配额,但当发生删除操作时,无需经过比较配额,直接删除即可,或者,系统配额与实际情况不一致的情况,这些,都需要一种机制来矫,而这个机制就是ResourceQuotaController。
ResourceQuotaController ResourceQuotaController定义在/pkg/controller/resourcequota/resource_quota_controller.go中:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type ResourceQuotaController struct {
kubeClient clientset.Interface
rqIndexer cache.Indexer
rqController *cache.Controller
queue workqueue.RateLimitingInterface
missingUsageQueue workqueue.RateLimitingInterface
syncHandler func (key string ) error
// function that controls full recalculation of quota usage
resyncPeriod controller .ResyncPeriodFunc
// knows how to calculate usage
registry quota .Registry
// controllers monitoring to notify for replenishment
replenishmentControllers []cache .ControllerInterface
}
具体字段含义如下:
kubeClient: kubernetes的client;
rqIndexer:存储resource quota用;
rqController: resource quota的controller;
queue: 缓存待处理的quota的key;
missingUsageQueue: 缓存Used字段未初始化的quota;
syncHandler:处理key所对应的quota的方法;
registry: quota的registry,可以使用registry.Evaluators()来获取所有的evaluator;
replenishmentControllers: 处理删除,更新后需要进行quota更新的操作。
可以通过NewResourceQuotaController()生成ResourceQuotaController():1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func NewResourceQuotaController (options *ResourceQuotaControllerOptions) *ResourceQuotaController {
rq := &ResourceQuotaController{
kubeClient: options.KubeClient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_primary" ),
missingUsageQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_priority" ),
resyncPeriod: options.ResyncPeriod,
registry: options.Registry,
replenishmentControllers: []cache.ControllerInterface{},
}
if options.KubeClient != nil && options.KubeClient.Core().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("resource_quota_controller" , options.KubeClient.Core().RESTClient().GetRateLimiter())
}
rq.syncHandler = rq.syncResourceQuotaFromKey
rq.rqIndexer, rq.rqController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func (options api.ListOptions) (runtime.Object, error) {
return rq.kubeClient.Core().ResourceQuotas(api.NamespaceAll).List(options)
},
WatchFunc: func (options api.ListOptions) (watch.Interface, error) {
return rq.kubeClient.Core().ResourceQuotas(api.NamespaceAll).Watch(options)
},
},
&api.ResourceQuota{},
rq.resyncPeriod(),
cache.ResourceEventHandlerFuncs{
AddFunc: rq.addQuota,
UpdateFunc: func (old, cur interface {}) {
oldResourceQuota := old.(*api.ResourceQuota)
curResourceQuota := cur.(*api.ResourceQuota)
if quota.Equals(curResourceQuota.Spec.Hard, oldResourceQuota.Spec.Hard) {
return
}
rq.addQuota(curResourceQuota)
},
DeleteFunc: rq.enqueueResourceQuota,
},
cache.Indexers{"namespace" : cache.MetaNamespaceIndexFunc},
)
for _, groupKindToReplenish := range options.GroupKindsToReplenish {
controllerOptions := &ReplenishmentControllerOptions{
GroupKind: groupKindToReplenish,
ResyncPeriod: options.ReplenishmentResyncPeriod,
ReplenishmentFunc: rq.replenishQuota,
}
replenishmentController, err := options.ControllerFactory.NewController(controllerOptions)
if err != nil {
glog.Warningf("quota controller unable to replenish %s due to %v, changes only accounted during full resync" , groupKindToReplenish, err)
} else {
rq.replenishmentControllers = append (rq.replenishmentControllers, replenishmentController)
}
}
return rq
}
NewResourceQuotaController()流程如下:
生成ResourceQuotaController;
填充ResourceQuotaController的syncHandler为rq.syncResourceQuotaFromKey;
生成quota的NewIndexerInformer,并把indexer和controller赋值给rq.rqIndexer, rq.rqController; 3.1 AddFunc为rq.addQuota() 3.2 UpdateFunc为rq.addQuota() 3.3 DeleteFunc为rq.enqueueResourceQuota()
生成replenishmentControllers(依据传入的资源列表生成多个replenishmentController)。
ResourceQuotaController::Run() 每个controller必须定义Run()方法,ResourceQuotaController也不例外:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (rq *ResourceQuotaController) Run (workers int , stopCh <-chan struct {}) {
defer utilruntime.HandleCrash()
go rq.rqController.Run(stopCh)
for _, replenishmentController := range rq.replenishmentControllers {
go replenishmentController.Run(stopCh)
}
for i := 0 ; i < workers; i++ {
go wait.Until(rq.worker(rq.queue), time.Second, stopCh)
go wait.Until(rq.worker(rq.missingUsageQueue), time.Second, stopCh)
}
go wait.Until(func () { rq.enqueueAll() }, rq.resyncPeriod(), stopCh)
<-stopCh
glog.Infof("Shutting down ResourceQuotaController" )
rq.queue.ShutDown()
}
Run()的流程如下:
启动rqController,这些关于quota的变化都会同步到rqIndexer中;
启动rq.replenishmentControllers中的replenishmentController;
启动workers个worker消费queue中的内容;
启动workers个worker消费missingUsageQueue中的内容;
启动定时检查所有quota的流程;
监听stopCh。
所以,只有给stopCh信号,Run()才会退出。
ResourceQuotaController::worker() 再来看下worker:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (rq *ResourceQuotaController) worker (queue workqueue.RateLimitingInterface) func () {
workFunc := func () bool {
key, quit := queue.Get()
if quit {
return true
}
defer queue.Done(key)
err := rq.syncHandler(key.(string ))
if err == nil {
queue.Forget(key)
return false
}
utilruntime.HandleError(err)
queue.AddRateLimited(key)
return false
}
return func () {
for {
if quit := workFunc(); quit {
glog.Infof("resource quota controller worker shutting down" )
return
}
}
}
}
worker的流程就是从queue中获取key,然后调用syncHandler()处理该key,syncHandler()就是rq.syncResourceQuotaFromKey()。
ResourceQuotaController::syncResourceQuotaFromKey() syncResourceQuotaFromKey()从rqIndexer中通过key获取quota,然后调用syncResourceQuota()处理quota。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (rq *ResourceQuotaController) syncResourceQuotaFromKey (key string ) (err error) {
startTime := time.Now()
defer func () {
glog.V(4 ).Infof("Finished syncing resource quota %q (%v)" , key, time.Now().Sub(startTime))
}()
obj, exists, err := rq.rqIndexer.GetByKey(key)
if !exists {
glog.Infof("Resource quota has been deleted %v" , key)
return nil
}
if err != nil {
glog.Infof("Unable to retrieve resource quota %v from store: %v" , key, err)
rq.queue.Add(key)
return err
}
quota := *obj.(*api.ResourceQuota)
return rq.syncResourceQuota(quota)
}
ResourceQuotaController::syncResourceQuota() 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
func (rq *ResourceQuotaController) syncResourceQuota (resourceQuota api.ResourceQuota) (err error) {
dirty := !api.Semantic.DeepEqual(resourceQuota.Spec.Hard, resourceQuota.Status.Hard)
dirty = dirty || (resourceQuota.Status.Hard == nil || resourceQuota.Status.Used == nil )
used := api.ResourceList{}
if resourceQuota.Status.Used != nil {
used = quota.Add(api.ResourceList{}, resourceQuota.Status.Used)
}
hardLimits := quota.Add(api.ResourceList{}, resourceQuota.Spec.Hard)
newUsage, err := quota.CalculateUsage(resourceQuota.Namespace, resourceQuota.Spec.Scopes, hardLimits, rq.registry)
if err != nil {
return err
}
for key, value := range newUsage {
used[key] = value
}
hardResources := quota.ResourceNames(hardLimits)
used = quota.Mask(used, hardResources)
usage := api.ResourceQuota{
ObjectMeta: api.ObjectMeta{
Name: resourceQuota.Name,
Namespace: resourceQuota.Namespace,
ResourceVersion: resourceQuota.ResourceVersion,
Labels: resourceQuota.Labels,
Annotations: resourceQuota.Annotations},
Status: api.ResourceQuotaStatus{
Hard: hardLimits,
Used: used,
},
}
dirty = dirty || !quota.Equals(usage.Status.Used, resourceQuota.Status.Used)
if dirty {
_, err = rq.kubeClient.Core().ResourceQuotas(usage.Namespace).UpdateStatus(&usage)
return err
}
return nil
}
syncResourceQuota()会计算quota对应namespace下资源的使用情况,如果需要更新,则更新quota。 所以,经过syncResourceQuota()处理过的quota就和实际情况保持一致了。
ResourceQuotaController::addQuota() 接着来看IndexerInformer中使用到的addQuota():1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (rq *ResourceQuotaController) addQuota (obj interface {}) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v" , obj, err)
return
}
resourceQuota := obj.(*api.ResourceQuota)
if !api.Semantic.DeepEqual(resourceQuota.Spec.Hard, resourceQuota.Status.Hard) {
rq.missingUsageQueue.Add(key)
return
}
for constraint := range resourceQuota.Status.Hard {
if _, usageFound := resourceQuota.Status.Used[constraint]; !usageFound {
matchedResources := []api.ResourceName{constraint}
for _, evaluator := range rq.registry.Evaluators() {
if intersection := quota.Intersection(evaluator.MatchesResources(), matchedResources); len (intersection) != 0 {
rq.missingUsageQueue.Add(key)
return
}
}
}
}
rq.queue.Add(key)
}
addQuota()会依据情况把quota的key加入到missingUsageQueue或queue中。只要该quota发生了变动,那对该quota进行处理。
ResourceQuotaController::enqueueResourceQuota() enqueueResourceQuota()只把quota的key加入到queue中。因为不是首次quota处理已经由addQuota()完成,即到enqueueResourceQuota()处理时Used初始化已经完成。1
2
3
4
5
6
7
8
9
func (rq *ResourceQuotaController) enqueueResourceQuota (obj interface {}) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v" , obj, err)
return
}
rq.queue.Add(key)
}
ResourceQuotaController::enqueueAll() 关于ResourceQuotaController,最后来看下定时处理全部quota的方法enqueueAll():1
2
3
4
5
6
7
8
9
func (rq *ResourceQuotaController) enqueueAll () {
defer glog.V(4 ).Infof("Resource quota controller queued all resource quota for full calculation of usage" )
for _, k := range rq.rqIndexer.ListKeys() {
rq.queue.Add(k)
}
}
enqueueAll()会把rqIndexer中所有的key取出,放到queue中处理。
replenishmentControllerFactory replenishmentControllerFactory定义在/pkg/controller/resourcequota/replenishment_controller.go中,负责某些资源的Delete或Update检查,进而触发更新quota的操作。ResourceQuotaController是通过调用NewController()来生成replenishmentController的。
replenishmentControllerFactory::NewController() NewController()会根据不同的资源生成不同的controller。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
func (r *replenishmentControllerFactory) NewController (options *ReplenishmentControllerOptions) (result cache.ControllerInterface, err error) {
if r.kubeClient != nil && r.kubeClient.Core().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("replenishment_controller" , r.kubeClient.Core().RESTClient().GetRateLimiter())
}
switch options.GroupKind {
case api.Kind("Pod" ):
if r.sharedInformerFactory != nil {
result, err = controllerFor(api.Resource("pods" ), r.sharedInformerFactory, cache.ResourceEventHandlerFuncs{
UpdateFunc: PodReplenishmentUpdateFunc(options),
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
})
break
}
result = informers.NewPodInformer(r.kubeClient, options.ResyncPeriod())
case api.Kind("Service" ):
_, result = cache.NewInformer(
&cache.ListWatch{
ListFunc: func (options api.ListOptions) (runtime.Object, error) {
return r.kubeClient.Core().Services(api.NamespaceAll).List(options)
},
WatchFunc: func (options api.ListOptions) (watch.Interface, error) {
return r.kubeClient.Core().Services(api.NamespaceAll).Watch(options)
},
},
&api.Service{},
options.ResyncPeriod(),
cache.ResourceEventHandlerFuncs{
UpdateFunc: ServiceReplenishmentUpdateFunc(options),
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
},
)
case api.Kind("ReplicationController" ):
_, result = cache.NewInformer(
&cache.ListWatch{
ListFunc: func (options api.ListOptions) (runtime.Object, error) {
return r.kubeClient.Core().ReplicationControllers(api.NamespaceAll).List(options)
},
WatchFunc: func (options api.ListOptions) (watch.Interface, error) {
return r.kubeClient.Core().ReplicationControllers(api.NamespaceAll).Watch(options)
},
},
&api.ReplicationController{},
options.ResyncPeriod(),
cache.ResourceEventHandlerFuncs{
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
},
)
case api.Kind("PersistentVolumeClaim" ):
if r.sharedInformerFactory != nil {
result, err = controllerFor(api.Resource("persistentvolumeclaims" ), r.sharedInformerFactory, cache.ResourceEventHandlerFuncs{
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
})
break
}
_, result = cache.NewInformer(
&cache.ListWatch{
ListFunc: func (options api.ListOptions) (runtime.Object, error) {
return r.kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).List(options)
},
WatchFunc: func (options api.ListOptions) (watch.Interface, error) {
return r.kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options)
},
},
&api.PersistentVolumeClaim{},
options.ResyncPeriod(),
cache.ResourceEventHandlerFuncs{
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
},
)
case api.Kind("Secret" ):
_, result = cache.NewInformer(
&cache.ListWatch{
ListFunc: func (options api.ListOptions) (runtime.Object, error) {
return r.kubeClient.Core().Secrets(api.NamespaceAll).List(options)
},
WatchFunc: func (options api.ListOptions) (watch.Interface, error) {
return r.kubeClient.Core().Secrets(api.NamespaceAll).Watch(options)
},
},
&api.Secret{},
options.ResyncPeriod(),
cache.ResourceEventHandlerFuncs{
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
},
)
case api.Kind("ConfigMap" ):
_, result = cache.NewInformer(
&cache.ListWatch{
ListFunc: func (options api.ListOptions) (runtime.Object, error) {
return r.kubeClient.Core().ConfigMaps(api.NamespaceAll).List(options)
},
WatchFunc: func (options api.ListOptions) (watch.Interface, error) {
return r.kubeClient.Core().ConfigMaps(api.NamespaceAll).Watch(options)
},
},
&api.ConfigMap{},
options.ResyncPeriod(),
cache.ResourceEventHandlerFuncs{
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
},
)
default :
return nil , NewUnhandledGroupKindError(options.GroupKind)
}
return result, err
}
这些controller的关键是UpdateFunc,DeleteFunc的设置。以”pod”为例:1
2
3
4
5
6
7
8
9
10
11
12
func PodReplenishmentUpdateFunc (options *ReplenishmentControllerOptions) func (oldObj, newObj interface {}) {
return func (oldObj, newObj interface {}) {
oldPod := oldObj.(*api.Pod)
newPod := newObj.(*api.Pod)
if core.QuotaPod(oldPod) && !core.QuotaPod(newPod) {
options.ReplenishmentFunc(options.GroupKind, newPod.Namespace, oldPod)
}
}
}
PodReplenishmentUpdateFunc()为UpdateFunc,PodReplenishmentUpdateFunc()会检查oldPod和newPod的,如果oldPod需要纳入计算,但newPod不需要纳入计算了,则触发ReplenishmentFunc()。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func ObjectReplenishmentDeleteFunc (options *ReplenishmentControllerOptions) func (obj interface {}) {
return func (obj interface {}) {
metaObject, err := meta.Accessor(obj)
if err != nil {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("replenishment controller could not get object from tombstone %+v, could take up to %v before quota is replenished" , obj, options.ResyncPeriod())
utilruntime.HandleError(err)
return
}
metaObject, err = meta.Accessor(tombstone.Obj)
if err != nil {
glog.Errorf("replenishment controller tombstone contained object that is not a meta %+v, could take up to %v before quota is replenished" , tombstone.Obj, options.ResyncPeriod())
utilruntime.HandleError(err)
return
}
}
options.ReplenishmentFunc(options.GroupKind, metaObject.GetNamespace(), nil )
}
}
ObjectReplenishmentDeleteFunc()为DeleteFunc,ObjectReplenishmentDeleteFunc()也针对触发ReplenishmentFunc()。
ReplenishmentFunc() ReplenishmentFunc()其实就是rq.replenishQuota(),从下面代码(定义在resource_quota_controller.go的NewResourceQuotaController()中)可以看出:1
2
3
4
5
6
7
controllerOptions := &ReplenishmentControllerOptions{
GroupKind: groupKindToReplenish,
ResyncPeriod: options.ReplenishmentResyncPeriod,
ReplenishmentFunc: rq.replenishQuota,
}
ResourceQuotaController::replenishQuota() replenishQuota()会依据资源找出需要更新的quota,并调用enqueueResourceQuota()触发处理动作。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (rq *ResourceQuotaController) replenishQuota (groupKind unversioned.GroupKind, namespace string , object runtime.Object) {
evaluators := rq.registry.Evaluators()
evaluator, found := evaluators[groupKind]
if !found {
return
}
indexKey := &api.ResourceQuota{}
indexKey.Namespace = namespace
resourceQuotas, err := rq.rqIndexer.Index("namespace" , indexKey)
if err != nil {
glog.Errorf("quota controller could not find ResourceQuota associated with namespace: %s, could take up to %v before a quota replenishes" , namespace, rq.resyncPeriod())
}
if len (resourceQuotas) == 0 {
return
}
matchedResources := evaluator.MatchesResources()
for i := range resourceQuotas {
resourceQuota := resourceQuotas[i].(*api.ResourceQuota)
resourceQuotaResources := quota.ResourceNames(resourceQuota.Status.Hard)
if len (quota.Intersection(matchedResources, resourceQuotaResources)) > 0 {
rq.enqueueResourceQuota(resourceQuota)
}
}
}
controllermanager 最后来看下controllermanager中对于ResourceQuotaController的调用,代码在/cmd/kube-controller-manager/app/controllermanager.go中:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
resourceQuotaControllerClient := client("resourcequota-controller" )
resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, sharedInformers)
groupKindsToReplenish := []unversioned.GroupKind{
api.Kind("Pod" ),
api.Kind("Service" ),
api.Kind("ReplicationController" ),
api.Kind("PersistentVolumeClaim" ),
api.Kind("Secret" ),
api.Kind("ConfigMap" ),
}
resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{
KubeClient: resourceQuotaControllerClient,
ResyncPeriod: controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod.Duration),
Registry: resourceQuotaRegistry,
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(sharedInformers, resourceQuotaControllerClient),
ReplenishmentResyncPeriod: ResyncPeriod(s),
GroupKindsToReplenish: groupKindsToReplenish,
}
go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(int (s.ConcurrentResourceQuotaSyncs), wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
所以,可以看出kube-controller监控了”Pod”, “Service”, “ReplicationController”, “PersistentVolumeClaim”, “Secret”, “ConfigMap”这些资源的Update和Delete操作,因为这些资源都和quota相关。