上次分析解读了evaluator是如何计算对象消耗的资源量的,本次分析将介绍Kubernetes如何判断配额是否足够,如何更新配额。
注册
我们之前说过,admission的plugin都要向admission中的plugins注册。resourcequota的注册代码在/plugin/pkg/admission/resourcequota/admission.go中:
1 2 3 4 5 6 7 8 9 10
| func init() { admission.RegisterPlugin("ResourceQuota", func(client clientset.Interface, config io.Reader) (admission.Interface, error) { registry := install.NewRegistry(client, nil) return NewResourceQuota(client, registry, 5, make(chan struct{})) }) }
|
init()函数中,resourcequota的生成方法为先生成registry,再通过调用NewResourceQuota()生成quotaAdmission。
quotaAdmission
quotaAdmission封装了admission.Handler及Evaluator(此处的Evaluator为quota的Evaluator,与上次分析中的Evaluator不同)。quotaAdmission定义在/plugin/pkg/admission/resourcequota/admission.go中:
1 2 3 4 5 6
| type quotaAdmission struct { *admission.Handler evaluator Evaluator }
|
现在来看下quotaAdmission的生成函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func NewResourceQuota(client clientset.Interface, registry quota.Registry, numEvaluators int, stopCh <-chan struct{}) (admission.Interface, error) { quotaAccessor, err := newQuotaAccessor(client) if err != nil { return nil, err } go quotaAccessor.Run(stopCh) evaluator := NewQuotaEvaluator(quotaAccessor, registry, nil, numEvaluators, stopCh) return "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, }, nil }
|
生成函数先通过NewQuotaEvaluator生成quotaEvaluator,然后再封装成quotaAdmission并返回。
quotaAdmission还提供了配额判断的入口:
1 2 3 4 5 6 7 8 9
| func (q *quotaAdmission) Admit(a admission.Attributes) (err error) { if a.GetSubresource() != "" { return nil } return q.evaluator.Evaluate(a) }
|
Admit()可以判断是否接受某请求,其主要调用了quotaEvaluator的Evaluate()函数。
quotaEvaluator
quotaEvaluator是处理具体请求的结构体,定义在/plugin/pkg/admission/resourcequota/controller.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| type quotaEvaluator struct { quotaAccessor QuotaAccessor lockAquisitionFunc func([]api.ResourceQuota) func() // registry that knows how to measure usage for objects registry quota.Registry // TODO these are used together to bucket items by namespace and then batch them up for processing. // The technique is valuable for rollup activities to avoid fanout and reduce resource contention. // We could move this into a library if another component needed it. // queue is indexed by namespace, so that we bundle up on a per-namespace basis queue *workqueue.Type workLock sync.Mutex work map[string][]*admissionWaiter dirtyWork map[string][]*admissionWaiter inProgress sets.String // controls the run method so that we can cleanly conform to the Evaluator interface workers int stopCh <-chan struct{} init sync.Once }
|
其中:
- quotaAccessor: 获取quota的内容;
- registry: 管理podEvaluator等资源使用量计算器;
- queue: 待处理的命名空间的队列;
- work: 待处理的任务;
- dirtyWork: 如果某命名空间正在处理,则暂存入dirtyWork中;
- inProgress: 标识正在处理的命名空间;
- workers:标明处理请求的协程数量。
quotaEvaluator中先会定义若干个workers,所有请求都会存储在work或dirtyWork的ns key下(如果正在处理该ns,则存储在ditryWork中)。worker会处理work中某ns下的所有请求,并一次更新resource quota。如果某请求超出配额,则该请求的result为error。worker在处理完某ns时,dirtyWork中该ns下的所有请求转到work的ns下。
NewQuotaEvaluator()
NewQuotaEvaluator()函数可以生成一个quotaEvaluator。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func NewQuotaEvaluator(quotaAccessor QuotaAccessor, registry quota.Registry, lockAquisitionFunc func([]api.ResourceQuota) func(), workers int, stopCh <-chan struct{}) Evaluator { return "aEvaluator{ quotaAccessor: quotaAccessor, lockAquisitionFunc: lockAquisitionFunc, registry: registry, queue: workqueue.NewNamed("admission_quota_controller"), work: map[string][]*admissionWaiter{}, dirtyWork: map[string][]*admissionWaiter{}, inProgress: sets.String{}, workers: workers, stopCh: stopCh, } }
|
Evaluate()
Evaluate()会启动quotaEvaluator的controller,然后把attributes封装成waiter,并调用addWork加入到quotaEvaluator中。需要注意的是,go e.run()
只会被执行一次。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| func (e *quotaEvaluator) Evaluate(a admission.Attributes) error { e.init.Do(func() { go e.run() }) evaluators := e.registry.Evaluators() evaluator, found := evaluators[a.GetKind().GroupKind()] if !found { return nil } op := a.GetOperation() operationResources := evaluator.OperationResources(op) if len(operationResources) == 0 { return nil } waiter := newAdmissionWaiter(a) e.addWork(waiter) select { case <-waiter.finished: case <-time.After(10 * time.Second): return fmt.Errorf("timeout") } return waiter.result }
|
addWork()
addWork()可以把admissionWaiter加入到quotaEvaluator。addWork()先获取admissionWaiter的namespace,然后把namespace加入到queue中标明该namespace下配额需要处理;如果该namespace正在处理中,则把admissionWaiter加入到dirtyWork中,否则加入到work中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| func (e *quotaEvaluator) addWork(a *admissionWaiter) { e.workLock.Lock() defer e.workLock.Unlock() ns := a.attributes.GetNamespace() e.queue.Add(ns) if e.inProgress.Has(ns) { e.dirtyWork[ns] = append(e.dirtyWork[ns], a) return } e.work[ns] = append(e.work[ns], a) }
|
getWork()
getWork()先从queue中获取需要处理的namespace,然后获取该namespace下的请求,并加入到inProgress以标识该namespace正在处理中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| func (e *quotaEvaluator) getWork() (string, []*admissionWaiter, bool) { uncastNS, shutdown := e.queue.Get() if shutdown { return "", []*admissionWaiter{}, shutdown } ns := uncastNS.(string) e.workLock.Lock() defer e.workLock.Unlock() work := e.work[ns] delete(e.work, ns) delete(e.dirtyWork, ns) if len(work) != 0 { e.inProgress.Insert(ns) return ns, work, false } e.queue.Done(ns) e.inProgress.Delete(ns) return ns, []*admissionWaiter{}, false }
|
completeWork()
completeWork()把dirtyWork中的请求转移到work中。
1 2 3 4 5 6 7 8 9 10 11
| func (e *quotaEvaluator) completeWork(ns string) { e.workLock.Lock() defer e.workLock.Unlock() e.queue.Done(ns) e.work[ns] = e.dirtyWork[ns] delete(e.dirtyWork, ns) e.inProgress.Delete(ns) }
|
run()
run()会开启workers个goroutine处理请求,每个goroutine都运行doWork loop。
1 2 3 4 5 6 7 8 9 10 11 12
| func (e *quotaEvaluator) run() { defer utilruntime.HandleCrash() for i := 0; i < e.workers; i++ { go wait.Until(e.doWork, time.Second, e.stopCh) } <-e.stopCh glog.Infof("Shutting down quota evaluator") e.queue.ShutDown() }
|
doWork()
doWork()先从quotaEvaluator获取一个namespace及该namespace下的请求,然后调用checkAttributes()对这些请求进行quota判定,最后调用compleleWork()完成dirtyWork中的请求向work中转移。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| func (e *quotaEvaluator) doWork() { workFunc := func() bool { ns, admissionAttributes, quit := e.getWork() if quit { return true } defer e.completeWork(ns) if len(admissionAttributes) == 0 { return false } e.checkAttributes(ns, admissionAttributes) return false } for { if quit := workFunc(); quit { glog.Infof("quota evaluator worker shutdown") return } } }
|
checkAttributes()
checkAttributes()先获取namespace下的quotas,然后调用checkQuotas()完成请求的配额检查,最后通知waiters已经完成配额检查。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| func (e *quotaEvaluator) checkAttributes(ns string, admissionAttributes []*admissionWaiter) { defer func() { for _, admissionAttribute := range admissionAttributes { close(admissionAttribute.finished) } }() quotas, err := e.quotaAccessor.GetQuotas(ns) if err != nil { for _, admissionAttribute := range admissionAttributes { admissionAttribute.result = err } return } if len(quotas) == 0 { for _, admissionAttribute := range admissionAttributes { admissionAttribute.result = nil } return } if e.lockAquisitionFunc != nil { releaseLocks := e.lockAquisitionFunc(quotas) defer releaseLocks() } e.checkQuotas(quotas, admissionAttributes, 3) }
|
checkRequest()
checkRequest()可以计算请求的所需资源量,并进行配额比较,如果已使用资源+申请资源<配额,则返回新的已使用资源。所以,checkRequest()是把某请求的资源量累加到namespace下的quota(可能是多个)中。具体步骤请看注释。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
| func (e *quotaEvaluator) checkRequest(quotas []api.ResourceQuota, a admission.Attributes) ([]api.ResourceQuota, error) { namespace := a.GetNamespace() evaluators := e.registry.Evaluators() evaluator, found := evaluators[a.GetKind().GroupKind()] if !found { return quotas, nil } op := a.GetOperation() operationResources := evaluator.OperationResources(op) if len(operationResources) == 0 { return quotas, nil } inputObject := a.GetObject() interestingQuotaIndexes := []int{} for i := range quotas { resourceQuota := quotas[i] match := evaluator.Matches(&resourceQuota, inputObject) if !match { continue } hardResources := quota.ResourceNames(resourceQuota.Status.Hard) evaluatorResources := evaluator.MatchesResources() requiredResources := quota.Intersection(hardResources, evaluatorResources) if err := evaluator.Constraints(requiredResources, inputObject); err != nil { return nil, admission.NewForbidden(a, fmt.Errorf("failed quota: %s: %v", resourceQuota.Name, err)) } if !hasUsageStats(&resourceQuota) { return nil, admission.NewForbidden(a, fmt.Errorf("status unknown for quota: %s", resourceQuota.Name)) } interestingQuotaIndexes = append(interestingQuotaIndexes, i) } if len(interestingQuotaIndexes) == 0 { return quotas, nil } if accessor, err := meta.Accessor(inputObject); namespace != "" && err == nil { if accessor.GetNamespace() == "" { accessor.SetNamespace(namespace) } } deltaUsage := evaluator.Usage(inputObject) if negativeUsage := quota.IsNegative(deltaUsage); len(negativeUsage) > 0 { return nil, admission.NewForbidden(a, fmt.Errorf("quota usage is negative for resource(s): %s", prettyPrintResourceNames(negativeUsage))) } if admission.Update == op { prevItem := a.GetOldObject() if prevItem == nil { return nil, admission.NewForbidden(a, fmt.Errorf("unable to get previous usage since prior version of object was not found")) } metadata, err := meta.Accessor(prevItem) if err == nil && len(metadata.GetResourceVersion()) > 0 { prevUsage := evaluator.Usage(prevItem) deltaUsage = quota.Subtract(deltaUsage, prevUsage) } } if quota.IsZero(deltaUsage) { return quotas, nil } outQuotas, err := copyQuotas(quotas) if err != nil { return nil, err } for _, index := range interestingQuotaIndexes { resourceQuota := outQuotas[index] hardResources := quota.ResourceNames(resourceQuota.Status.Hard) requestedUsage := quota.Mask(deltaUsage, hardResources) newUsage := quota.Add(resourceQuota.Status.Used, requestedUsage) maskedNewUsage := quota.Mask(newUsage, quota.ResourceNames(requestedUsage)) if allowed, exceeded := quota.LessThanOrEqual(maskedNewUsage, resourceQuota.Status.Hard); !allowed { failedRequestedUsage := quota.Mask(requestedUsage, exceeded) failedUsed := quota.Mask(resourceQuota.Status.Used, exceeded) failedHard := quota.Mask(resourceQuota.Status.Hard, exceeded) return nil, admission.NewForbidden(a, fmt.Errorf("exceeded quota: %s, requested: %s, used: %s, limited: %s", resourceQuota.Name, prettyPrint(failedRequestedUsage), prettyPrint(failedUsed), prettyPrint(failedHard))) } outQuotas[index].Status.Used = newUsage } return outQuotas, nil }
|
checkQuotas()
checkQuotas()先调用checkRequest()累加每个请求的资源量,并更新发生更新的quota。具体步骤见注释。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
| func (e *quotaEvaluator) checkQuotas(quotas []api.ResourceQuota, admissionAttributes []*admissionWaiter, remainingRetries int) { originalQuotas, err := copyQuotas(quotas) if err != nil { utilruntime.HandleError(err) return } atLeastOneChanged := false for i := range admissionAttributes { admissionAttribute := admissionAttributes[i] newQuotas, err := e.checkRequest(quotas, admissionAttribute.attributes) if err != nil { admissionAttribute.result = err continue } atLeastOneChangeForThisWaiter := false for j := range newQuotas { if !quota.Equals(quotas[j].Status.Used, newQuotas[j].Status.Used) { atLeastOneChanged = true atLeastOneChangeForThisWaiter = true break } } if !atLeastOneChangeForThisWaiter { admissionAttribute.result = nil } quotas = newQuotas } if !atLeastOneChanged { return } var updatedFailedQuotas []api.ResourceQuota var lastErr error for i := range quotas { newQuota := quotas[i] if quota.Equals(originalQuotas[i].Status.Used, newQuota.Status.Used) { continue } if err := e.quotaAccessor.UpdateQuotaStatus(&newQuota); err != nil { updatedFailedQuotas = append(updatedFailedQuotas, newQuota) lastErr = err } } if len(updatedFailedQuotas) == 0 { for _, admissionAttribute := range admissionAttributes { if IsDefaultDeny(admissionAttribute.result) { admissionAttribute.result = nil } } return } if remainingRetries <= 0 { for _, admissionAttribute := range admissionAttributes { if IsDefaultDeny(admissionAttribute.result) { admissionAttribute.result = lastErr } } return } newQuotas, err := e.quotaAccessor.GetQuotas(quotas[0].Namespace) if err != nil { for _, admissionAttribute := range admissionAttributes { if IsDefaultDeny(admissionAttribute.result) { admissionAttribute.result = lastErr } } return } quotasToCheck := []api.ResourceQuota{} for _, newQuota := range newQuotas { for _, oldQuota := range updatedFailedQuotas { if newQuota.Name == oldQuota.Name { quotasToCheck = append(quotasToCheck, newQuota) break } } } e.checkQuotas(quotasToCheck, admissionAttributes, remainingRetries-1) }
|
总结
在resourcequota中,有evaluator的概念用来计算某对象的资源,如podEvaluator等。这当中,有一个特殊的evaluator,即quotaEvaluator,quotaEvaluator可以对某namespace下的所有请求进行配额累加处理(累加时如果超出配额,则拒绝该请求),如果某配额发生了更新,则quotaEvaluator会把对应的配额进行更新。