上次分析解读了evaluator是如何计算对象消耗的资源量的,本次分析将介绍Kubernetes如何判断配额是否足够,如何更新配额。
注册
我们之前说过,admission的plugin都要向admission中的plugins注册。resourcequota的注册代码在/plugin/pkg/admission/resourcequota/admission.go中:
1 2 3 4 5 6 7 8 9 10 
  | func init() { 	 	admission.RegisterPlugin("ResourceQuota", 		func(client clientset.Interface, config io.Reader) (admission.Interface, error) { 			 			 			registry := install.NewRegistry(client, nil) 			return NewResourceQuota(client, registry, 5, make(chan struct{})) 		}) } 
  | 
init()函数中,resourcequota的生成方法为先生成registry,再通过调用NewResourceQuota()生成quotaAdmission。
quotaAdmission
quotaAdmission封装了admission.Handler及Evaluator(此处的Evaluator为quota的Evaluator,与上次分析中的Evaluator不同)。quotaAdmission定义在/plugin/pkg/admission/resourcequota/admission.go中:
1 2 3 4 5 6 
  | type quotaAdmission struct { 	*admission.Handler 	evaluator Evaluator } 
  | 
现在来看下quotaAdmission的生成函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 
  | func NewResourceQuota(client clientset.Interface, registry quota.Registry, numEvaluators int, stopCh <-chan struct{}) (admission.Interface, error) { 	quotaAccessor, err := newQuotaAccessor(client) 	if err != nil { 		return nil, err 	} 	go quotaAccessor.Run(stopCh) 	 	evaluator := NewQuotaEvaluator(quotaAccessor, registry, nil, numEvaluators, stopCh) 	return "aAdmission{ 		 		Handler:   admission.NewHandler(admission.Create, admission.Update), 		evaluator: evaluator, 	}, nil } 
  | 
生成函数先通过NewQuotaEvaluator生成quotaEvaluator,然后再封装成quotaAdmission并返回。
quotaAdmission还提供了配额判断的入口:
1 2 3 4 5 6 7 8 9 
  | func (q *quotaAdmission) Admit(a admission.Attributes) (err error) { 	 	if a.GetSubresource() != "" { 		return nil 	} 	return q.evaluator.Evaluate(a) } 
  | 
Admit()可以判断是否接受某请求,其主要调用了quotaEvaluator的Evaluate()函数。
quotaEvaluator
quotaEvaluator是处理具体请求的结构体,定义在/plugin/pkg/admission/resourcequota/controller.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 
  | type quotaEvaluator struct { 	quotaAccessor QuotaAccessor 	 	lockAquisitionFunc func([]api.ResourceQuota) func() 	// registry that knows how to measure usage for objects 	registry quota.Registry 	// TODO these are used together to bucket items by namespace and then batch them up for processing. 	// The technique is valuable for rollup activities to avoid fanout and reduce resource contention. 	// We could move this into a library if another component needed it. 	// queue is indexed by namespace, so that we bundle up on a per-namespace basis 	queue      *workqueue.Type 	workLock   sync.Mutex 	work       map[string][]*admissionWaiter 	dirtyWork  map[string][]*admissionWaiter 	inProgress sets.String 	// controls the run method so that we can cleanly conform to the Evaluator interface 	workers int 	stopCh  <-chan struct{} 	 	init sync.Once } 
  | 
其中:
- quotaAccessor: 获取quota的内容;
 
- registry: 管理podEvaluator等资源使用量计算器;
 
- queue: 待处理的命名空间的队列;
 
- work: 待处理的任务;
 
- dirtyWork: 如果某命名空间正在处理,则暂存入dirtyWork中;
 
- inProgress: 标识正在处理的命名空间;
 
- workers:标明处理请求的协程数量。
 
quotaEvaluator中先会定义若干个workers,所有请求都会存储在work或dirtyWork的ns key下(如果正在处理该ns,则存储在ditryWork中)。worker会处理work中某ns下的所有请求,并一次更新resource quota。如果某请求超出配额,则该请求的result为error。worker在处理完某ns时,dirtyWork中该ns下的所有请求转到work的ns下。
NewQuotaEvaluator()
NewQuotaEvaluator()函数可以生成一个quotaEvaluator。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 
  | func NewQuotaEvaluator(quotaAccessor QuotaAccessor, registry quota.Registry, lockAquisitionFunc func([]api.ResourceQuota) func(), workers int, stopCh <-chan struct{}) Evaluator { 	return "aEvaluator{ 		quotaAccessor:      quotaAccessor, 		lockAquisitionFunc: lockAquisitionFunc, 		registry: registry, 		queue:      workqueue.NewNamed("admission_quota_controller"), 		work:       map[string][]*admissionWaiter{}, 		dirtyWork:  map[string][]*admissionWaiter{}, 		inProgress: sets.String{}, 		workers: workers, 		stopCh:  stopCh, 	} } 
  | 
Evaluate()
Evaluate()会启动quotaEvaluator的controller,然后把attributes封装成waiter,并调用addWork加入到quotaEvaluator中。需要注意的是,go e.run()只会被执行一次。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 
  | func (e *quotaEvaluator) Evaluate(a admission.Attributes) error { 	e.init.Do(func() { 		go e.run() 	}) 	 	evaluators := e.registry.Evaluators() 	evaluator, found := evaluators[a.GetKind().GroupKind()] 	if !found { 		return nil 	} 	 	 	op := a.GetOperation() 	operationResources := evaluator.OperationResources(op) 	if len(operationResources) == 0 { 		return nil 	} 	waiter := newAdmissionWaiter(a) 	e.addWork(waiter) 	 	 	select { 	case <-waiter.finished: 	case <-time.After(10 * time.Second): 		return fmt.Errorf("timeout") 	} 	return waiter.result } 
  | 
addWork()
addWork()可以把admissionWaiter加入到quotaEvaluator。addWork()先获取admissionWaiter的namespace,然后把namespace加入到queue中标明该namespace下配额需要处理;如果该namespace正在处理中,则把admissionWaiter加入到dirtyWork中,否则加入到work中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 
  | func (e *quotaEvaluator) addWork(a *admissionWaiter) { 	e.workLock.Lock() 	defer e.workLock.Unlock() 	ns := a.attributes.GetNamespace() 	 	 	e.queue.Add(ns) 	 	if e.inProgress.Has(ns) { 		e.dirtyWork[ns] = append(e.dirtyWork[ns], a) 		return 	} 	e.work[ns] = append(e.work[ns], a) } 
  | 
getWork()
getWork()先从queue中获取需要处理的namespace,然后获取该namespace下的请求,并加入到inProgress以标识该namespace正在处理中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 
  | func (e *quotaEvaluator) getWork() (string, []*admissionWaiter, bool) { 	uncastNS, shutdown := e.queue.Get() 	if shutdown { 		return "", []*admissionWaiter{}, shutdown 	} 	ns := uncastNS.(string) 	e.workLock.Lock() 	defer e.workLock.Unlock() 	 	 	 	 	work := e.work[ns] 	delete(e.work, ns) 	delete(e.dirtyWork, ns) 	if len(work) != 0 { 		e.inProgress.Insert(ns) 		return ns, work, false 	} 	e.queue.Done(ns) 	e.inProgress.Delete(ns) 	return ns, []*admissionWaiter{}, false } 
  | 
completeWork()
completeWork()把dirtyWork中的请求转移到work中。
1 2 3 4 5 6 7 8 9 10 11 
  | func (e *quotaEvaluator) completeWork(ns string) { 	e.workLock.Lock() 	defer e.workLock.Unlock() 	e.queue.Done(ns) 	e.work[ns] = e.dirtyWork[ns] 	delete(e.dirtyWork, ns) 	e.inProgress.Delete(ns) } 
  | 
run()
run()会开启workers个goroutine处理请求,每个goroutine都运行doWork loop。
1 2 3 4 5 6 7 8 9 10 11 12 
  | func (e *quotaEvaluator) run() { 	defer utilruntime.HandleCrash() 	 	for i := 0; i < e.workers; i++ { 		go wait.Until(e.doWork, time.Second, e.stopCh) 	} 	<-e.stopCh 	glog.Infof("Shutting down quota evaluator") 	e.queue.ShutDown() } 
  | 
doWork()
doWork()先从quotaEvaluator获取一个namespace及该namespace下的请求,然后调用checkAttributes()对这些请求进行quota判定,最后调用compleleWork()完成dirtyWork中的请求向work中转移。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 
  | func (e *quotaEvaluator) doWork() { 	workFunc := func() bool { 		 		ns, admissionAttributes, quit := e.getWork() 		if quit { 			return true 		} 		 		defer e.completeWork(ns) 		if len(admissionAttributes) == 0 { 			return false 		} 		 		e.checkAttributes(ns, admissionAttributes) 		return false 	} 	for { 		if quit := workFunc(); quit { 			glog.Infof("quota evaluator worker shutdown") 			return 		} 	} } 
  | 
checkAttributes()
checkAttributes()先获取namespace下的quotas,然后调用checkQuotas()完成请求的配额检查,最后通知waiters已经完成配额检查。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 
  | func (e *quotaEvaluator) checkAttributes(ns string, admissionAttributes []*admissionWaiter) { 	 	 	defer func() { 		for _, admissionAttribute := range admissionAttributes { 			close(admissionAttribute.finished) 		} 	}() 	 	quotas, err := e.quotaAccessor.GetQuotas(ns) 	if err != nil { 		for _, admissionAttribute := range admissionAttributes { 			admissionAttribute.result = err 		} 		return 	} 	if len(quotas) == 0 { 		for _, admissionAttribute := range admissionAttributes { 			admissionAttribute.result = nil 		} 		return 	} 	if e.lockAquisitionFunc != nil { 		releaseLocks := e.lockAquisitionFunc(quotas) 		defer releaseLocks() 	} 	 	e.checkQuotas(quotas, admissionAttributes, 3) } 
  | 
checkRequest()
checkRequest()可以计算请求的所需资源量,并进行配额比较,如果已使用资源+申请资源<配额,则返回新的已使用资源。所以,checkRequest()是把某请求的资源量累加到namespace下的quota(可能是多个)中。具体步骤请看注释。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 
  | func (e *quotaEvaluator) checkRequest(quotas []api.ResourceQuota, a admission.Attributes) ([]api.ResourceQuota, error) { 	namespace := a.GetNamespace() 	evaluators := e.registry.Evaluators() 	 	evaluator, found := evaluators[a.GetKind().GroupKind()] 	if !found { 		return quotas, nil 	} 	 	op := a.GetOperation() 	operationResources := evaluator.OperationResources(op) 	if len(operationResources) == 0 { 		return quotas, nil 	} 	 	 	 	 	inputObject := a.GetObject() 	interestingQuotaIndexes := []int{} 	for i := range quotas { 		resourceQuota := quotas[i] 		 		match := evaluator.Matches(&resourceQuota, inputObject) 		if !match { 			continue 		} 		hardResources := quota.ResourceNames(resourceQuota.Status.Hard) 		evaluatorResources := evaluator.MatchesResources() 		requiredResources := quota.Intersection(hardResources, evaluatorResources) 		 		if err := evaluator.Constraints(requiredResources, inputObject); err != nil { 			return nil, admission.NewForbidden(a, fmt.Errorf("failed quota: %s: %v", resourceQuota.Name, err)) 		} 		if !hasUsageStats(&resourceQuota) { 			return nil, admission.NewForbidden(a, fmt.Errorf("status unknown for quota: %s", resourceQuota.Name)) 		} 		 		interestingQuotaIndexes = append(interestingQuotaIndexes, i) 	} 	if len(interestingQuotaIndexes) == 0 { 		return quotas, nil 	} 	 	 	 	 	if accessor, err := meta.Accessor(inputObject); namespace != "" && err == nil { 		if accessor.GetNamespace() == "" { 			accessor.SetNamespace(namespace) 		} 	} 	 	 	 	 	 	deltaUsage := evaluator.Usage(inputObject) 	 	 	if negativeUsage := quota.IsNegative(deltaUsage); len(negativeUsage) > 0 { 		return nil, admission.NewForbidden(a, fmt.Errorf("quota usage is negative for resource(s): %s", prettyPrintResourceNames(negativeUsage))) 	} 	 	if admission.Update == op { 		prevItem := a.GetOldObject() 		if prevItem == nil { 			return nil, admission.NewForbidden(a, fmt.Errorf("unable to get previous usage since prior version of object was not found")) 		} 		 		 		metadata, err := meta.Accessor(prevItem) 		if err == nil && len(metadata.GetResourceVersion()) > 0 { 			prevUsage := evaluator.Usage(prevItem) 			deltaUsage = quota.Subtract(deltaUsage, prevUsage) 		} 	} 	if quota.IsZero(deltaUsage) { 		return quotas, nil 	} 	outQuotas, err := copyQuotas(quotas) 	if err != nil { 		return nil, err 	} 	for _, index := range interestingQuotaIndexes { 		resourceQuota := outQuotas[index] 		hardResources := quota.ResourceNames(resourceQuota.Status.Hard) 		requestedUsage := quota.Mask(deltaUsage, hardResources) 		 		newUsage := quota.Add(resourceQuota.Status.Used, requestedUsage) 		maskedNewUsage := quota.Mask(newUsage, quota.ResourceNames(requestedUsage)) 		 		if allowed, exceeded := quota.LessThanOrEqual(maskedNewUsage, resourceQuota.Status.Hard); !allowed { 			failedRequestedUsage := quota.Mask(requestedUsage, exceeded) 			failedUsed := quota.Mask(resourceQuota.Status.Used, exceeded) 			failedHard := quota.Mask(resourceQuota.Status.Hard, exceeded) 			return nil, admission.NewForbidden(a, 				fmt.Errorf("exceeded quota: %s, requested: %s, used: %s, limited: %s", 					resourceQuota.Name, 					prettyPrint(failedRequestedUsage), 					prettyPrint(failedUsed), 					prettyPrint(failedHard))) 		} 		 		outQuotas[index].Status.Used = newUsage 	} 	return outQuotas, nil } 
  | 
checkQuotas()
checkQuotas()先调用checkRequest()累加每个请求的资源量,并更新发生更新的quota。具体步骤见注释。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 
  | func (e *quotaEvaluator) checkQuotas(quotas []api.ResourceQuota, admissionAttributes []*admissionWaiter, remainingRetries int) { 	 	originalQuotas, err := copyQuotas(quotas) 	if err != nil { 		utilruntime.HandleError(err) 		return 	} 	atLeastOneChanged := false 	for i := range admissionAttributes { 		admissionAttribute := admissionAttributes[i] 		 		newQuotas, err := e.checkRequest(quotas, admissionAttribute.attributes) 		 		if err != nil { 			admissionAttribute.result = err 			continue 		} 		 		 		atLeastOneChangeForThisWaiter := false 		 		for j := range newQuotas { 			if !quota.Equals(quotas[j].Status.Used, newQuotas[j].Status.Used) { 				atLeastOneChanged = true 				atLeastOneChangeForThisWaiter = true 				break 			} 		} 		if !atLeastOneChangeForThisWaiter { 			admissionAttribute.result = nil 		} 		quotas = newQuotas 	} 	 	if !atLeastOneChanged { 		return 	} 	 	 	 	 	var updatedFailedQuotas []api.ResourceQuota 	var lastErr error 	for i := range quotas { 		newQuota := quotas[i] 		 		if quota.Equals(originalQuotas[i].Status.Used, newQuota.Status.Used) { 			continue 		} 		 		if err := e.quotaAccessor.UpdateQuotaStatus(&newQuota); err != nil { 			updatedFailedQuotas = append(updatedFailedQuotas, newQuota) 			lastErr = err 		} 	} 	 	if len(updatedFailedQuotas) == 0 { 		 		 		for _, admissionAttribute := range admissionAttributes { 			if IsDefaultDeny(admissionAttribute.result) { 				admissionAttribute.result = nil 			} 		} 		return 	} 	 	 	if remainingRetries <= 0 { 		for _, admissionAttribute := range admissionAttributes { 			if IsDefaultDeny(admissionAttribute.result) { 				admissionAttribute.result = lastErr 			} 		} 		return 	} 	 	 	 	 	newQuotas, err := e.quotaAccessor.GetQuotas(quotas[0].Namespace) 	if err != nil { 		 		for _, admissionAttribute := range admissionAttributes { 			if IsDefaultDeny(admissionAttribute.result) { 				admissionAttribute.result = lastErr 			} 		} 		return 	} 	 	 	quotasToCheck := []api.ResourceQuota{} 	for _, newQuota := range newQuotas { 		for _, oldQuota := range updatedFailedQuotas { 			if newQuota.Name == oldQuota.Name { 				quotasToCheck = append(quotasToCheck, newQuota) 				break 			} 		} 	} 	e.checkQuotas(quotasToCheck, admissionAttributes, remainingRetries-1) } 
  | 
总结
在resourcequota中,有evaluator的概念用来计算某对象的资源,如podEvaluator等。这当中,有一个特殊的evaluator,即quotaEvaluator,quotaEvaluator可以对某namespace下的所有请求进行配额累加处理(累加时如果超出配额,则拒绝该请求),如果某配额发生了更新,则quotaEvaluator会把对应的配额进行更新。