本次分析将介绍ResourceQuotaController。之前我们已经介绍过Kubernetes如何管理配额,但当发生删除操作时,无需经过比较配额,直接删除即可,或者,系统配额与实际情况不一致的情况,这些,都需要一种机制来矫,而这个机制就是ResourceQuotaController。

ResourceQuotaController

ResourceQuotaController定义在/pkg/controller/resourcequota/resource_quota_controller.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// ResourceQuotaController is responsible for tracking quota usage status in the system
type ResourceQuotaController struct {
// Must have authority to list all resources in the system, and update quota status
kubeClient clientset.Interface
// An index of resource quota objects by namespace
rqIndexer cache.Indexer
// Watches changes to all resource quota
rqController *cache.Controller
// ResourceQuota objects that need to be synchronized
queue workqueue.RateLimitingInterface
// missingUsageQueue holds objects that are missing the initial usage informatino
missingUsageQueue workqueue.RateLimitingInterface
// To allow injection of syncUsage for testing.
syncHandler func(key string) error
// function that controls full recalculation of quota usage
resyncPeriod controller.ResyncPeriodFunc
// knows how to calculate usage
registry quota.Registry
// controllers monitoring to notify for replenishment
replenishmentControllers []cache.ControllerInterface
}

具体字段含义如下:

  1. kubeClient: kubernetes的client;
  2. rqIndexer:存储resource quota用;
  3. rqController: resource quota的controller;
  4. queue: 缓存待处理的quota的key;
  5. missingUsageQueue: 缓存Used字段未初始化的quota;
  6. syncHandler:处理key所对应的quota的方法;
  7. registry: quota的registry,可以使用registry.Evaluators()来获取所有的evaluator;
  8. replenishmentControllers: 处理删除,更新后需要进行quota更新的操作。

可以通过NewResourceQuotaController()生成ResourceQuotaController():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *ResourceQuotaController {
// build the resource quota controller
rq := &ResourceQuotaController{
kubeClient: options.KubeClient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_primary"),
missingUsageQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_priority"),
resyncPeriod: options.ResyncPeriod,
registry: options.Registry,
replenishmentControllers: []cache.ControllerInterface{},
}
if options.KubeClient != nil && options.KubeClient.Core().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("resource_quota_controller", options.KubeClient.Core().RESTClient().GetRateLimiter())
}
// set the synchronization handler
rq.syncHandler = rq.syncResourceQuotaFromKey
// build the controller that observes quota
rq.rqIndexer, rq.rqController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return rq.kubeClient.Core().ResourceQuotas(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return rq.kubeClient.Core().ResourceQuotas(api.NamespaceAll).Watch(options)
},
},
&api.ResourceQuota{},
rq.resyncPeriod(),
cache.ResourceEventHandlerFuncs{
AddFunc: rq.addQuota,
UpdateFunc: func(old, cur interface{}) {
// We are only interested in observing updates to quota.spec to drive updates to quota.status.
// We ignore all updates to quota.Status because they are all driven by this controller.
// IMPORTANT:
// We do not use this function to queue up a full quota recalculation. To do so, would require
// us to enqueue all quota.Status updates, and since quota.Status updates involve additional queries
// that cannot be backed by a cache and result in a full query of a namespace's content, we do not
// want to pay the price on spurious status updates. As a result, we have a separate routine that is
// responsible for enqueue of all resource quotas when doing a full resync (enqueueAll)
oldResourceQuota := old.(*api.ResourceQuota)
curResourceQuota := cur.(*api.ResourceQuota)
if quota.Equals(curResourceQuota.Spec.Hard, oldResourceQuota.Spec.Hard) {
return
}
rq.addQuota(curResourceQuota)
},
// This will enter the sync loop and no-op, because the controller has been deleted from the store.
// Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the controller.
DeleteFunc: rq.enqueueResourceQuota,
},
cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc},
)
for _, groupKindToReplenish := range options.GroupKindsToReplenish {
controllerOptions := &ReplenishmentControllerOptions{
GroupKind: groupKindToReplenish,
ResyncPeriod: options.ReplenishmentResyncPeriod,
//***Fankang***//
//***ReplenishentFunc,即rq.replenishQuota***//
ReplenishmentFunc: rq.replenishQuota,
}
replenishmentController, err := options.ControllerFactory.NewController(controllerOptions)
if err != nil {
glog.Warningf("quota controller unable to replenish %s due to %v, changes only accounted during full resync", groupKindToReplenish, err)
} else {
rq.replenishmentControllers = append(rq.replenishmentControllers, replenishmentController)
}
}
return rq
}

NewResourceQuotaController()流程如下:

  1. 生成ResourceQuotaController;
  2. 填充ResourceQuotaController的syncHandler为rq.syncResourceQuotaFromKey;
  3. 生成quota的NewIndexerInformer,并把indexer和controller赋值给rq.rqIndexer, rq.rqController;
    3.1 AddFunc为rq.addQuota()
    3.2 UpdateFunc为rq.addQuota()
    3.3 DeleteFunc为rq.enqueueResourceQuota()
  4. 生成replenishmentControllers(依据传入的资源列表生成多个replenishmentController)。

ResourceQuotaController::Run()

每个controller必须定义Run()方法,ResourceQuotaController也不例外:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Run begins quota controller using the specified number of workers
func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go rq.rqController.Run(stopCh)
// the controllers that replenish other resources to respond rapidly to state changes
//***启动replenishmentController***//
for _, replenishmentController := range rq.replenishmentControllers {
go replenishmentController.Run(stopCh)
}
// the workers that chug through the quota calculation backlog
//***启动workers***//
for i := 0; i < workers; i++ {
go wait.Until(rq.worker(rq.queue), time.Second, stopCh)
go wait.Until(rq.worker(rq.missingUsageQueue), time.Second, stopCh)
}
// the timer for how often we do a full recalculation across all quotas
go wait.Until(func() { rq.enqueueAll() }, rq.resyncPeriod(), stopCh)
<-stopCh
glog.Infof("Shutting down ResourceQuotaController")
rq.queue.ShutDown()
}

Run()的流程如下:

  1. 启动rqController,这些关于quota的变化都会同步到rqIndexer中;
  2. 启动rq.replenishmentControllers中的replenishmentController;
  3. 启动workers个worker消费queue中的内容;
  4. 启动workers个worker消费missingUsageQueue中的内容;
  5. 启动定时检查所有quota的流程;
  6. 监听stopCh。

所以,只有给stopCh信号,Run()才会退出。

ResourceQuotaController::worker()

再来看下worker:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
func (rq *ResourceQuotaController) worker(queue workqueue.RateLimitingInterface) func() {
workFunc := func() bool {
//***从queue中获取key***//
key, quit := queue.Get()
if quit {
return true
}
defer queue.Done(key)
//***处理key***//
err := rq.syncHandler(key.(string))
if err == nil {
queue.Forget(key)
return false
}
utilruntime.HandleError(err)
queue.AddRateLimited(key)
return false
}
return func() {
for {
//***执行workFunc()***//
if quit := workFunc(); quit {
glog.Infof("resource quota controller worker shutting down")
return
}
}
}
}

worker的流程就是从queue中获取key,然后调用syncHandler()处理该key,syncHandler()就是rq.syncResourceQuotaFromKey()。

ResourceQuotaController::syncResourceQuotaFromKey()

syncResourceQuotaFromKey()从rqIndexer中通过key获取quota,然后调用syncResourceQuota()处理quota。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// syncResourceQuotaFromKey syncs a quota key
func (rq *ResourceQuotaController) syncResourceQuotaFromKey(key string) (err error) {
startTime := time.Now()
defer func() {
glog.V(4).Infof("Finished syncing resource quota %q (%v)", key, time.Now().Sub(startTime))
}()
//***从rqIndexer中获取key对应的quota***//
obj, exists, err := rq.rqIndexer.GetByKey(key)
if !exists {
glog.Infof("Resource quota has been deleted %v", key)
return nil
}
if err != nil {
glog.Infof("Unable to retrieve resource quota %v from store: %v", key, err)
rq.queue.Add(key)
return err
}
quota := *obj.(*api.ResourceQuota)
//***调用syncResourceQuota()***//
return rq.syncResourceQuota(quota)
}

ResourceQuotaController::syncResourceQuota()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// syncResourceQuota runs a complete sync of resource quota status across all known kinds
//***Fankang***//
//***处理resourceQuota***//
func (rq *ResourceQuotaController) syncResourceQuota(resourceQuota api.ResourceQuota) (err error) {
// quota is dirty if any part of spec hard limits differs from the status hard limits
dirty := !api.Semantic.DeepEqual(resourceQuota.Spec.Hard, resourceQuota.Status.Hard)
// dirty tracks if the usage status differs from the previous sync,
// if so, we send a new usage with latest status
// if this is our first sync, it will be dirty by default, since we need track usage
//***Fankang***//
//***dirty表示是否有必要更新***//
dirty = dirty || (resourceQuota.Status.Hard == nil || resourceQuota.Status.Used == nil)
used := api.ResourceList{}
if resourceQuota.Status.Used != nil {
used = quota.Add(api.ResourceList{}, resourceQuota.Status.Used)
}
//***Fankang***//
//***status.hard必须和spec.hard相等***//
hardLimits := quota.Add(api.ResourceList{}, resourceQuota.Spec.Hard)
//***计算新的资源使用量***//
newUsage, err := quota.CalculateUsage(resourceQuota.Namespace, resourceQuota.Spec.Scopes, hardLimits, rq.registry)
if err != nil {
return err
}
for key, value := range newUsage {
used[key] = value
}
// ensure set of used values match those that have hard constraints
hardResources := quota.ResourceNames(hardLimits)
used = quota.Mask(used, hardResources)
// Create a usage object that is based on the quota resource version that will handle updates
// by default, we preserve the past usage observation, and set hard to the current spec
usage := api.ResourceQuota{
ObjectMeta: api.ObjectMeta{
Name: resourceQuota.Name,
Namespace: resourceQuota.Namespace,
ResourceVersion: resourceQuota.ResourceVersion,
Labels: resourceQuota.Labels,
Annotations: resourceQuota.Annotations},
Status: api.ResourceQuotaStatus{
Hard: hardLimits,
Used: used,
},
}
dirty = dirty || !quota.Equals(usage.Status.Used, resourceQuota.Status.Used)
// there was a change observed by this controller that requires we update quota
//***如果需要更新,则更新***//
if dirty {
_, err = rq.kubeClient.Core().ResourceQuotas(usage.Namespace).UpdateStatus(&usage)
return err
}
return nil
}

syncResourceQuota()会计算quota对应namespace下资源的使用情况,如果需要更新,则更新quota。
所以,经过syncResourceQuota()处理过的quota就和实际情况保持一致了。

ResourceQuotaController::addQuota()

接着来看IndexerInformer中使用到的addQuota():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//***把quota的key加入到queue中***//
func (rq *ResourceQuotaController) addQuota(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}
resourceQuota := obj.(*api.ResourceQuota)
// if we declared an intent that is not yet captured in status (prioritize it)
if !api.Semantic.DeepEqual(resourceQuota.Spec.Hard, resourceQuota.Status.Hard) {
rq.missingUsageQueue.Add(key)
return
}
// if we declared a constraint that has no usage (which this controller can calculate, prioritize it)
for constraint := range resourceQuota.Status.Hard {
if _, usageFound := resourceQuota.Status.Used[constraint]; !usageFound {
matchedResources := []api.ResourceName{constraint}
for _, evaluator := range rq.registry.Evaluators() {
if intersection := quota.Intersection(evaluator.MatchesResources(), matchedResources); len(intersection) != 0 {
rq.missingUsageQueue.Add(key)
return
}
}
}
}
// no special priority, go in normal recalc queue
rq.queue.Add(key)
}

addQuota()会依据情况把quota的key加入到missingUsageQueue或queue中。只要该quota发生了变动,那对该quota进行处理。

ResourceQuotaController::enqueueResourceQuota()

enqueueResourceQuota()只把quota的key加入到queue中。因为不是首次quota处理已经由addQuota()完成,即到enqueueResourceQuota()处理时Used初始化已经完成。

1
2
3
4
5
6
7
8
9
// obj could be an *api.ResourceQuota, or a DeletionFinalStateUnknown marker item.
func (rq *ResourceQuotaController) enqueueResourceQuota(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}
rq.queue.Add(key)
}

ResourceQuotaController::enqueueAll()

关于ResourceQuotaController,最后来看下定时处理全部quota的方法enqueueAll():

1
2
3
4
5
6
7
8
9
// enqueueAll is called at the fullResyncPeriod interval to force a full recalculation of quota usage statistics
//***Fankang***//
//***把所有的key加入到queue中***//
func (rq *ResourceQuotaController) enqueueAll() {
defer glog.V(4).Infof("Resource quota controller queued all resource quota for full calculation of usage")
for _, k := range rq.rqIndexer.ListKeys() {
rq.queue.Add(k)
}
}

enqueueAll()会把rqIndexer中所有的key取出,放到queue中处理。

replenishmentControllerFactory

replenishmentControllerFactory定义在/pkg/controller/resourcequota/replenishment_controller.go中,负责某些资源的Delete或Update检查,进而触发更新quota的操作。ResourceQuotaController是通过调用NewController()来生成replenishmentController的。

replenishmentControllerFactory::NewController()

NewController()会根据不同的资源生成不同的controller。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (result cache.ControllerInterface, err error) {
if r.kubeClient != nil && r.kubeClient.Core().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("replenishment_controller", r.kubeClient.Core().RESTClient().GetRateLimiter())
}
switch options.GroupKind {
case api.Kind("Pod"):
if r.sharedInformerFactory != nil {
result, err = controllerFor(api.Resource("pods"), r.sharedInformerFactory, cache.ResourceEventHandlerFuncs{
UpdateFunc: PodReplenishmentUpdateFunc(options),
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
})
break
}
result = informers.NewPodInformer(r.kubeClient, options.ResyncPeriod())
case api.Kind("Service"):
// TODO move to informer when defined
_, result = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return r.kubeClient.Core().Services(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return r.kubeClient.Core().Services(api.NamespaceAll).Watch(options)
},
},
&api.Service{},
options.ResyncPeriod(),
cache.ResourceEventHandlerFuncs{
UpdateFunc: ServiceReplenishmentUpdateFunc(options),
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
},
)
case api.Kind("ReplicationController"):
// TODO move to informer when defined
_, result = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return r.kubeClient.Core().ReplicationControllers(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return r.kubeClient.Core().ReplicationControllers(api.NamespaceAll).Watch(options)
},
},
&api.ReplicationController{},
options.ResyncPeriod(),
cache.ResourceEventHandlerFuncs{
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
},
)
case api.Kind("PersistentVolumeClaim"):
if r.sharedInformerFactory != nil {
result, err = controllerFor(api.Resource("persistentvolumeclaims"), r.sharedInformerFactory, cache.ResourceEventHandlerFuncs{
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
})
break
}
// TODO (derekwaynecarr) remove me when we can require a sharedInformerFactory in all code paths...
_, result = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return r.kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return r.kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options)
},
},
&api.PersistentVolumeClaim{},
options.ResyncPeriod(),
cache.ResourceEventHandlerFuncs{
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
},
)
case api.Kind("Secret"):
// TODO move to informer when defined
_, result = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return r.kubeClient.Core().Secrets(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return r.kubeClient.Core().Secrets(api.NamespaceAll).Watch(options)
},
},
&api.Secret{},
options.ResyncPeriod(),
cache.ResourceEventHandlerFuncs{
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
},
)
case api.Kind("ConfigMap"):
// TODO move to informer when defined
_, result = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return r.kubeClient.Core().ConfigMaps(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return r.kubeClient.Core().ConfigMaps(api.NamespaceAll).Watch(options)
},
},
&api.ConfigMap{},
options.ResyncPeriod(),
cache.ResourceEventHandlerFuncs{
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
},
)
default:
return nil, NewUnhandledGroupKindError(options.GroupKind)
}
return result, err
}

这些controller的关键是UpdateFunc,DeleteFunc的设置。以”pod”为例:

1
2
3
4
5
6
7
8
9
10
11
12
// PodReplenishmentUpdateFunc will replenish if the old pod was quota tracked but the new is not
func PodReplenishmentUpdateFunc(options *ReplenishmentControllerOptions) func(oldObj, newObj interface{}) {
return func(oldObj, newObj interface{}) {
oldPod := oldObj.(*api.Pod)
newPod := newObj.(*api.Pod)
//***Fankang***//
//***QuotaPod()检查pod是否需要被计算***//
if core.QuotaPod(oldPod) && !core.QuotaPod(newPod) {
options.ReplenishmentFunc(options.GroupKind, newPod.Namespace, oldPod)
}
}
}

PodReplenishmentUpdateFunc()为UpdateFunc,PodReplenishmentUpdateFunc()会检查oldPod和newPod的,如果oldPod需要纳入计算,但newPod不需要纳入计算了,则触发ReplenishmentFunc()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// ObjectReplenenishmentDeleteFunc will replenish on every delete
func ObjectReplenishmentDeleteFunc(options *ReplenishmentControllerOptions) func(obj interface{}) {
return func(obj interface{}) {
metaObject, err := meta.Accessor(obj)
if err != nil {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("replenishment controller could not get object from tombstone %+v, could take up to %v before quota is replenished", obj, options.ResyncPeriod())
utilruntime.HandleError(err)
return
}
metaObject, err = meta.Accessor(tombstone.Obj)
if err != nil {
glog.Errorf("replenishment controller tombstone contained object that is not a meta %+v, could take up to %v before quota is replenished", tombstone.Obj, options.ResyncPeriod())
utilruntime.HandleError(err)
return
}
}
options.ReplenishmentFunc(options.GroupKind, metaObject.GetNamespace(), nil)
}
}

ObjectReplenishmentDeleteFunc()为DeleteFunc,ObjectReplenishmentDeleteFunc()也针对触发ReplenishmentFunc()。

ReplenishmentFunc()

ReplenishmentFunc()其实就是rq.replenishQuota(),从下面代码(定义在resource_quota_controller.go的NewResourceQuotaController()中)可以看出:

1
2
3
4
5
6
7
controllerOptions := &ReplenishmentControllerOptions{
GroupKind: groupKindToReplenish,
ResyncPeriod: options.ReplenishmentResyncPeriod,
//***Fankang***//
//***ReplenishentFunc,即rq.replenishQuota***//
ReplenishmentFunc: rq.replenishQuota,
}

ResourceQuotaController::replenishQuota()

replenishQuota()会依据资源找出需要更新的quota,并调用enqueueResourceQuota()触发处理动作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// replenishQuota is a replenishment function invoked by a controller to notify that a quota should be recalculated
func (rq *ResourceQuotaController) replenishQuota(groupKind unversioned.GroupKind, namespace string, object runtime.Object) {
// check if the quota controller can evaluate this kind, if not, ignore it altogether...
evaluators := rq.registry.Evaluators()
evaluator, found := evaluators[groupKind]
if !found {
return
}
// check if this namespace even has a quota...
indexKey := &api.ResourceQuota{}
indexKey.Namespace = namespace
resourceQuotas, err := rq.rqIndexer.Index("namespace", indexKey)
if err != nil {
glog.Errorf("quota controller could not find ResourceQuota associated with namespace: %s, could take up to %v before a quota replenishes", namespace, rq.resyncPeriod())
}
if len(resourceQuotas) == 0 {
return
}
// only queue those quotas that are tracking a resource associated with this kind.
matchedResources := evaluator.MatchesResources()
for i := range resourceQuotas {
resourceQuota := resourceQuotas[i].(*api.ResourceQuota)
resourceQuotaResources := quota.ResourceNames(resourceQuota.Status.Hard)
//***Fankang***//
//***如果resourceQuotas有涉及,则更新***//
if len(quota.Intersection(matchedResources, resourceQuotaResources)) > 0 {
// TODO: make this support targeted replenishment to a specific kind, right now it does a full recalc on that quota.
rq.enqueueResourceQuota(resourceQuota)
}
}
}

controllermanager

最后来看下controllermanager中对于ResourceQuotaController的调用,代码在/cmd/kube-controller-manager/app/controllermanager.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
resourceQuotaControllerClient := client("resourcequota-controller")
resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, sharedInformers)
groupKindsToReplenish := []unversioned.GroupKind{
api.Kind("Pod"),
api.Kind("Service"),
api.Kind("ReplicationController"),
api.Kind("PersistentVolumeClaim"),
api.Kind("Secret"),
api.Kind("ConfigMap"),
}
resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{
KubeClient: resourceQuotaControllerClient,
ResyncPeriod: controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod.Duration),
Registry: resourceQuotaRegistry,
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(sharedInformers, resourceQuotaControllerClient),
ReplenishmentResyncPeriod: ResyncPeriod(s),
GroupKindsToReplenish: groupKindsToReplenish,
}
go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(int(s.ConcurrentResourceQuotaSyncs), wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))

所以,可以看出kube-controller监控了”Pod”, “Service”, “ReplicationController”, “PersistentVolumeClaim”, “Secret”, “ConfigMap”这些资源的Update和Delete操作,因为这些资源都和quota相关。