上次分析解读了evaluator是如何计算对象消耗的资源量的,本次分析将介绍Kubernetes如何判断配额是否足够,如何更新配额。

注册

我们之前说过,admission的plugin都要向admission中的plugins注册。resourcequota的注册代码在/plugin/pkg/admission/resourcequota/admission.go中:

1
2
3
4
5
6
7
8
9
10
func init() {
//***向admission中的plugins中注册resourcequota的创建函数***//
admission.RegisterPlugin("ResourceQuota",
func(client clientset.Interface, config io.Reader) (admission.Interface, error) {
// NOTE: we do not provide informers to the registry because admission level decisions
// does not require us to open watches for all items tracked by quota.
registry := install.NewRegistry(client, nil)
return NewResourceQuota(client, registry, 5, make(chan struct{}))
})
}

init()函数中,resourcequota的生成方法为先生成registry,再通过调用NewResourceQuota()生成quotaAdmission。

quotaAdmission

quotaAdmission封装了admission.Handler及Evaluator(此处的Evaluator为quota的Evaluator,与上次分析中的Evaluator不同)。quotaAdmission定义在/plugin/pkg/admission/resourcequota/admission.go中:

1
2
3
4
5
6
// quotaAdmission implements an admission controller that can enforce quota constraints
type quotaAdmission struct {
*admission.Handler
evaluator Evaluator
}

现在来看下quotaAdmission的生成函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func NewResourceQuota(client clientset.Interface, registry quota.Registry, numEvaluators int, stopCh <-chan struct{}) (admission.Interface, error) {
quotaAccessor, err := newQuotaAccessor(client)
if err != nil {
return nil, err
}
go quotaAccessor.Run(stopCh)
//***生成quotaevaluator***//
evaluator := NewQuotaEvaluator(quotaAccessor, registry, nil, numEvaluators, stopCh)
return &quotaAdmission{
//***resourcequota只处理Create, Update类型的请求***//
Handler: admission.NewHandler(admission.Create, admission.Update),
evaluator: evaluator,
}, nil
}

生成函数先通过NewQuotaEvaluator生成quotaEvaluator,然后再封装成quotaAdmission并返回。

quotaAdmission还提供了配额判断的入口:

1
2
3
4
5
6
7
8
9
//***配额判断的入口***//
func (q *quotaAdmission) Admit(a admission.Attributes) (err error) {
// ignore all operations that correspond to sub-resource actions
if a.GetSubresource() != "" {
return nil
}
return q.evaluator.Evaluate(a)
}

Admit()可以判断是否接受某请求,其主要调用了quotaEvaluator的Evaluate()函数。

quotaEvaluator

quotaEvaluator是处理具体请求的结构体,定义在/plugin/pkg/admission/resourcequota/controller.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type quotaEvaluator struct {
quotaAccessor QuotaAccessor
// lockAquisitionFunc acquires any required locks and returns a cleanup method to defer
lockAquisitionFunc func([]api.ResourceQuota) func()
// registry that knows how to measure usage for objects
registry quota.Registry
// TODO these are used together to bucket items by namespace and then batch them up for processing.
// The technique is valuable for rollup activities to avoid fanout and reduce resource contention.
// We could move this into a library if another component needed it.
// queue is indexed by namespace, so that we bundle up on a per-namespace basis
queue *workqueue.Type
workLock sync.Mutex
work map[string][]*admissionWaiter
dirtyWork map[string][]*admissionWaiter
inProgress sets.String
// controls the run method so that we can cleanly conform to the Evaluator interface
workers int
stopCh <-chan struct{}
//***sync.Once.Do(f func())保证once只执行一次,无论你是否更换once.Do(xx)这里的方法,这个sync.Once块只会执行一次。***//
init sync.Once
}

其中:

  • quotaAccessor: 获取quota的内容;
  • registry: 管理podEvaluator等资源使用量计算器;
  • queue: 待处理的命名空间的队列;
  • work: 待处理的任务;
  • dirtyWork: 如果某命名空间正在处理,则暂存入dirtyWork中;
  • inProgress: 标识正在处理的命名空间;
  • workers:标明处理请求的协程数量。

quotaEvaluator中先会定义若干个workers,所有请求都会存储在work或dirtyWork的ns key下(如果正在处理该ns,则存储在ditryWork中)。worker会处理work中某ns下的所有请求,并一次更新resource quota。如果某请求超出配额,则该请求的result为error。worker在处理完某ns时,dirtyWork中该ns下的所有请求转到work的ns下。

NewQuotaEvaluator()

NewQuotaEvaluator()函数可以生成一个quotaEvaluator。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func NewQuotaEvaluator(quotaAccessor QuotaAccessor, registry quota.Registry, lockAquisitionFunc func([]api.ResourceQuota) func(), workers int, stopCh <-chan struct{}) Evaluator {
return &quotaEvaluator{
quotaAccessor: quotaAccessor,
lockAquisitionFunc: lockAquisitionFunc,
registry: registry,
queue: workqueue.NewNamed("admission_quota_controller"),
work: map[string][]*admissionWaiter{},
dirtyWork: map[string][]*admissionWaiter{},
inProgress: sets.String{},
workers: workers,
stopCh: stopCh,
}
}

Evaluate()

Evaluate()会启动quotaEvaluator的controller,然后把attributes封装成waiter,并调用addWork加入到quotaEvaluator中。需要注意的是,go e.run()只会被执行一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//***入口***//
func (e *quotaEvaluator) Evaluate(a admission.Attributes) error {
e.init.Do(func() {
go e.run()
})
// if we do not know how to evaluate use for this kind, just ignore
evaluators := e.registry.Evaluators()
evaluator, found := evaluators[a.GetKind().GroupKind()]
if !found {
return nil
}
// for this kind, check if the operation could mutate any quota resources
// if no resources tracked by quota are impacted, then just return
op := a.GetOperation()
operationResources := evaluator.OperationResources(op)
if len(operationResources) == 0 {
return nil
}
waiter := newAdmissionWaiter(a)
e.addWork(waiter)
// wait for completion or timeout
//***等待waiter处理完毕,或者超时***//
select {
case <-waiter.finished:
case <-time.After(10 * time.Second):
return fmt.Errorf("timeout")
}
return waiter.result
}

addWork()

addWork()可以把admissionWaiter加入到quotaEvaluator。addWork()先获取admissionWaiter的namespace,然后把namespace加入到queue中标明该namespace下配额需要处理;如果该namespace正在处理中,则把admissionWaiter加入到dirtyWork中,否则加入到work中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (e *quotaEvaluator) addWork(a *admissionWaiter) {
e.workLock.Lock()
defer e.workLock.Unlock()
ns := a.attributes.GetNamespace()
// this Add can trigger a Get BEFORE the work is added to a list, but this is ok because the getWork routine
// waits the worklock before retrieving the work to do, so the writes in this method will be observed
e.queue.Add(ns)
//***如果该ns下的请求正在被处理,则先把请求加到dirtyWork中***//
if e.inProgress.Has(ns) {
e.dirtyWork[ns] = append(e.dirtyWork[ns], a)
return
}
e.work[ns] = append(e.work[ns], a)
}

getWork()

getWork()先从queue中获取需要处理的namespace,然后获取该namespace下的请求,并加入到inProgress以标识该namespace正在处理中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//***取得一个ns及其下的请求***//
func (e *quotaEvaluator) getWork() (string, []*admissionWaiter, bool) {
uncastNS, shutdown := e.queue.Get()
if shutdown {
return "", []*admissionWaiter{}, shutdown
}
ns := uncastNS.(string)
e.workLock.Lock()
defer e.workLock.Unlock()
// at this point, we know we have a coherent view of e.work. It is entirely possible
// that our workqueue has another item requeued to it, but we'll pick it up early. This ok
// because the next time will go into our dirty list
//***开始处理某namespace,处理的数据已取走,清理work和dirtyWork的数据***//
work := e.work[ns]
delete(e.work, ns)
delete(e.dirtyWork, ns)
if len(work) != 0 {
e.inProgress.Insert(ns)
return ns, work, false
}
e.queue.Done(ns)
e.inProgress.Delete(ns)
return ns, []*admissionWaiter{}, false
}

completeWork()

completeWork()把dirtyWork中的请求转移到work中。

1
2
3
4
5
6
7
8
9
10
11
//***把dirtyWork中ns下的请求转移到work***//
//***把ns从标记正在处理的inProgress中删除***//
func (e *quotaEvaluator) completeWork(ns string) {
e.workLock.Lock()
defer e.workLock.Unlock()
e.queue.Done(ns)
e.work[ns] = e.dirtyWork[ns]
delete(e.dirtyWork, ns)
e.inProgress.Delete(ns)
}

run()

run()会开启workers个goroutine处理请求,每个goroutine都运行doWork loop。

1
2
3
4
5
6
7
8
9
10
11
12
// Run begins watching and syncing.
func (e *quotaEvaluator) run() {
defer utilruntime.HandleCrash()
//***开启workers个goroutine进行resource quota admit处理***//
for i := 0; i < e.workers; i++ {
go wait.Until(e.doWork, time.Second, e.stopCh)
}
<-e.stopCh
glog.Infof("Shutting down quota evaluator")
e.queue.ShutDown()
}

doWork()

doWork()先从quotaEvaluator获取一个namespace及该namespace下的请求,然后调用checkAttributes()对这些请求进行quota判定,最后调用compleleWork()完成dirtyWork中的请求向work中转移。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//***从quotaEvaluator中获取一个work,然后进行配额检查***//
func (e *quotaEvaluator) doWork() {
workFunc := func() bool {
//***获取namespace及该namespace下的请求***//
ns, admissionAttributes, quit := e.getWork()
if quit {
return true
}
//***处理完成worker中的请求之后,会转移dirty worker中的请求到worker map***//
defer e.completeWork(ns)
if len(admissionAttributes) == 0 {
return false
}
//***检查配额***//
e.checkAttributes(ns, admissionAttributes)
return false
}
for {
if quit := workFunc(); quit {
glog.Infof("quota evaluator worker shutdown")
return
}
}
}

checkAttributes()

checkAttributes()先获取namespace下的quotas,然后调用checkQuotas()完成请求的配额检查,最后通知waiters已经完成配额检查。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// checkAttributes iterates evaluates all the waiting admissionAttributes. It will always notify all waiters
// before returning. The default is to deny.
func (e *quotaEvaluator) checkAttributes(ns string, admissionAttributes []*admissionWaiter) {
// notify all on exit
//***通知所有的waiters***//
defer func() {
for _, admissionAttribute := range admissionAttributes {
close(admissionAttribute.finished)
}
}()
//***获取某namespace下的quotas***//
quotas, err := e.quotaAccessor.GetQuotas(ns)
if err != nil {
for _, admissionAttribute := range admissionAttributes {
admissionAttribute.result = err
}
return
}
if len(quotas) == 0 {
for _, admissionAttribute := range admissionAttributes {
admissionAttribute.result = nil
}
return
}
if e.lockAquisitionFunc != nil {
releaseLocks := e.lockAquisitionFunc(quotas)
defer releaseLocks()
}
//***检查resource quota,如需更新,则尝试更新***//
e.checkQuotas(quotas, admissionAttributes, 3)
}

checkRequest()

checkRequest()可以计算请求的所需资源量,并进行配额比较,如果已使用资源+申请资源<配额,则返回新的已使用资源。所以,checkRequest()是把某请求的资源量累加到namespace下的quota(可能是多个)中。具体步骤请看注释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
func (e *quotaEvaluator) checkRequest(quotas []api.ResourceQuota, a admission.Attributes) ([]api.ResourceQuota, error) {
namespace := a.GetNamespace()
evaluators := e.registry.Evaluators()
//***获取该请求所需要的evaluator***//
evaluator, found := evaluators[a.GetKind().GroupKind()]
if !found {
return quotas, nil
}
//***获取evaluator下对应op下所支持的配额项***//
op := a.GetOperation()
operationResources := evaluator.OperationResources(op)
if len(operationResources) == 0 {
return quotas, nil
}
// find the set of quotas that are pertinent to this request
// reject if we match the quota, but usage is not calculated yet
// reject if the input object does not satisfy quota constraints
// if there are no pertinent quotas, we can just return
inputObject := a.GetObject()
interestingQuotaIndexes := []int{}
for i := range quotas {
resourceQuota := quotas[i]
//***查看evaluator所涉及到的资源是否resourceQuota相关,如果不相关,则continue***//
match := evaluator.Matches(&resourceQuota, inputObject)
if !match {
continue
}
hardResources := quota.ResourceNames(resourceQuota.Status.Hard)
evaluatorResources := evaluator.MatchesResources()
requiredResources := quota.Intersection(hardResources, evaluatorResources)
//***检查inputObject中的资源是否符合要求***//
if err := evaluator.Constraints(requiredResources, inputObject); err != nil {
return nil, admission.NewForbidden(a, fmt.Errorf("failed quota: %s: %v", resourceQuota.Name, err))
}
if !hasUsageStats(&resourceQuota) {
return nil, admission.NewForbidden(a, fmt.Errorf("status unknown for quota: %s", resourceQuota.Name))
}
//***标记所涉及到的quota***//
interestingQuotaIndexes = append(interestingQuotaIndexes, i)
}
if len(interestingQuotaIndexes) == 0 {
return quotas, nil
}
// Usage of some resources cannot be counted in isolation. For example, when
// the resource represents a number of unique references to external
// resource. In such a case an evaluator needs to process other objects in
// the same namespace which needs to be known.
if accessor, err := meta.Accessor(inputObject); namespace != "" && err == nil {
if accessor.GetNamespace() == "" {
accessor.SetNamespace(namespace)
}
}
// there is at least one quota that definitely matches our object
// as a result, we need to measure the usage of this object for quota
// on updates, we need to subtract the previous measured usage
// if usage shows no change, just return since it has no impact on quota
//***计算请求资源***//
deltaUsage := evaluator.Usage(inputObject)
// ensure that usage for input object is never negative (this would mean a resource made a negative resource requirement)
//***检测deltaUsage中的项是否为负***//
if negativeUsage := quota.IsNegative(deltaUsage); len(negativeUsage) > 0 {
return nil, admission.NewForbidden(a, fmt.Errorf("quota usage is negative for resource(s): %s", prettyPrintResourceNames(negativeUsage)))
}
//***Update的情况的deltaUsage=请求资源-已经拥有资源***//
if admission.Update == op {
prevItem := a.GetOldObject()
if prevItem == nil {
return nil, admission.NewForbidden(a, fmt.Errorf("unable to get previous usage since prior version of object was not found"))
}
// if we can definitively determine that this is not a case of "create on update",
// then charge based on the delta. Otherwise, bill the maximum
metadata, err := meta.Accessor(prevItem)
if err == nil && len(metadata.GetResourceVersion()) > 0 {
prevUsage := evaluator.Usage(prevItem)
deltaUsage = quota.Subtract(deltaUsage, prevUsage)
}
}
if quota.IsZero(deltaUsage) {
return quotas, nil
}
outQuotas, err := copyQuotas(quotas)
if err != nil {
return nil, err
}
for _, index := range interestingQuotaIndexes {
resourceQuota := outQuotas[index]
hardResources := quota.ResourceNames(resourceQuota.Status.Hard)
requestedUsage := quota.Mask(deltaUsage, hardResources)
//***请求资源和原来的resource quota相加***//
newUsage := quota.Add(resourceQuota.Status.Used, requestedUsage)
maskedNewUsage := quota.Mask(newUsage, quota.ResourceNames(requestedUsage))
//***比较***//
if allowed, exceeded := quota.LessThanOrEqual(maskedNewUsage, resourceQuota.Status.Hard); !allowed {
failedRequestedUsage := quota.Mask(requestedUsage, exceeded)
failedUsed := quota.Mask(resourceQuota.Status.Used, exceeded)
failedHard := quota.Mask(resourceQuota.Status.Hard, exceeded)
return nil, admission.NewForbidden(a,
fmt.Errorf("exceeded quota: %s, requested: %s, used: %s, limited: %s",
resourceQuota.Name,
prettyPrint(failedRequestedUsage),
prettyPrint(failedUsed),
prettyPrint(failedHard)))
}
// update to the new usage number
outQuotas[index].Status.Used = newUsage
}
return outQuotas, nil
}

checkQuotas()

checkQuotas()先调用checkRequest()累加每个请求的资源量,并更新发生更新的quota。具体步骤见注释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
func (e *quotaEvaluator) checkQuotas(quotas []api.ResourceQuota, admissionAttributes []*admissionWaiter, remainingRetries int) {
// yet another copy to compare against originals to see if we actually have deltas
originalQuotas, err := copyQuotas(quotas)
if err != nil {
utilruntime.HandleError(err)
return
}
atLeastOneChanged := false
for i := range admissionAttributes {
admissionAttribute := admissionAttributes[i]
//***计算resource quota,如果超出配额,则err不为空***//
newQuotas, err := e.checkRequest(quotas, admissionAttribute.attributes)
//***如果err不为空(如超出配额),则把拒绝该请求,并开始轮询下一个请求***//
if err != nil {
admissionAttribute.result = err
continue
}
// if the new quotas are the same as the old quotas, then this particular one doesn't issue any updates
// that means that no quota docs applied, so it can get a pass
atLeastOneChangeForThisWaiter := false
//***检查是否有quota存在更新***//
for j := range newQuotas {
if !quota.Equals(quotas[j].Status.Used, newQuotas[j].Status.Used) {
atLeastOneChanged = true
atLeastOneChangeForThisWaiter = true
break
}
}
if !atLeastOneChangeForThisWaiter {
admissionAttribute.result = nil
}
quotas = newQuotas
}
// if none of the requests changed anything, there's no reason to issue an update, just fail them all now
if !atLeastOneChanged {
return
}
// now go through and try to issue updates. Things get a little weird here:
// 1. check to see if the quota changed. If not, skip.
// 2. if the quota changed and the update passes, be happy
// 3. if the quota changed and the update fails, add the original to a retry list
var updatedFailedQuotas []api.ResourceQuota
var lastErr error
for i := range quotas {
newQuota := quotas[i]
// if this quota didn't have its status changed, skip it
if quota.Equals(originalQuotas[i].Status.Used, newQuota.Status.Used) {
continue
}
//***更新resourceQuota***//
if err := e.quotaAccessor.UpdateQuotaStatus(&newQuota); err != nil {
updatedFailedQuotas = append(updatedFailedQuotas, newQuota)
lastErr = err
}
}
//***resource quota更新成功***//
if len(updatedFailedQuotas) == 0 {
// all the updates succeeded. At this point, anything with the default deny error was just waiting to
// get a successful update, so we can mark and notify
for _, admissionAttribute := range admissionAttributes {
if IsDefaultDeny(admissionAttribute.result) {
admissionAttribute.result = nil
}
}
return
}
// at this point, errors are fatal. Update all waiters without status to failed and return
//***resource quota更新不成功***//
if remainingRetries <= 0 {
for _, admissionAttribute := range admissionAttributes {
if IsDefaultDeny(admissionAttribute.result) {
admissionAttribute.result = lastErr
}
}
return
}
// this retry logic has the same bug that its possible to be checking against quota in a state that never actually exists where
// you've added a new documented, then updated an old one, your resource matches both and you're only checking one
// updates for these quota names failed. Get the current quotas in the namespace, compare by name, check to see if the
// resource versions have changed. If not, we're going to fall through an fail everything. If they all have, then we can try again
newQuotas, err := e.quotaAccessor.GetQuotas(quotas[0].Namespace)
if err != nil {
// this means that updates failed. Anything with a default deny error has failed and we need to let them know
for _, admissionAttribute := range admissionAttributes {
if IsDefaultDeny(admissionAttribute.result) {
admissionAttribute.result = lastErr
}
}
return
}
// this logic goes through our cache to find the new version of all quotas that failed update. If something has been removed
// it is skipped on this retry. After all, you removed it.
quotasToCheck := []api.ResourceQuota{}
for _, newQuota := range newQuotas {
for _, oldQuota := range updatedFailedQuotas {
if newQuota.Name == oldQuota.Name {
quotasToCheck = append(quotasToCheck, newQuota)
break
}
}
}
e.checkQuotas(quotasToCheck, admissionAttributes, remainingRetries-1)
}

总结

在resourcequota中,有evaluator的概念用来计算某对象的资源,如podEvaluator等。这当中,有一个特殊的evaluator,即quotaEvaluator,quotaEvaluator可以对某namespace下的所有请求进行配额累加处理(累加时如果超出配额,则拒绝该请求),如果某配额发生了更新,则quotaEvaluator会把对应的配额进行更新。