本次将分析kube-scheduler的核心——scheduler,来看下是如何生成scheduler,及如何对Pod进行调度的。
ConfigFactory
ConfigFactory可以生成Scheduler Config。先来看如何生成一个ConfigFactory,生成函数字义在/plugin/pkg/scheduler/factory/factory.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
| func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodAffinitySymmetricWeight int, failureDomains string) *ConfigFactory { stopEverything := make(chan struct{}) schedulerCache := schedulercache.New(30*time.Second, stopEverything) informerFactory := informers.NewSharedInformerFactory(client, 0) pvcInformer := informerFactory.PersistentVolumeClaims() c := &ConfigFactory{ Client: client, PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc), ScheduledPodLister: &cache.StoreToPodLister{}, informerFactory: informerFactory, NodeLister: &cache.StoreToNodeLister{}, PVLister: &cache.StoreToPVFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, PVCLister: pvcInformer.Lister(), pvcPopulator: pvcInformer.Informer().GetController(), ServiceLister: &cache.StoreToServiceLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, ControllerLister: &cache.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, ReplicaSetLister: &cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, schedulerCache: schedulerCache, StopEverything: stopEverything, SchedulerName: schedulerName, HardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight, FailureDomains: failureDomains, } c.PodLister = schedulerCache c.ScheduledPodLister.Indexer, c.scheduledPodPopulator = cache.NewIndexerInformer( c.createAssignedNonTerminatedPodLW(), &api.Pod{}, 0, cache.ResourceEventHandlerFuncs{ AddFunc: c.addPodToCache, UpdateFunc: c.updatePodInCache, DeleteFunc: c.deletePodFromCache, }, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) c.NodeLister.Store, c.nodePopulator = cache.NewInformer( c.createNodeLW(), &api.Node{}, 0, cache.ResourceEventHandlerFuncs{ AddFunc: c.addNodeToCache, UpdateFunc: c.updateNodeInCache, DeleteFunc: c.deleteNodeFromCache, }, ) c.PVLister.Store, c.pvPopulator = cache.NewInformer( c.createPersistentVolumeLW(), &api.PersistentVolume{}, 0, cache.ResourceEventHandlerFuncs{}, ) c.ServiceLister.Indexer, c.servicePopulator = cache.NewIndexerInformer( c.createServiceLW(), &api.Service{}, 0, cache.ResourceEventHandlerFuncs{}, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) c.ControllerLister.Indexer, c.controllerPopulator = cache.NewIndexerInformer( c.createControllerLW(), &api.ReplicationController{}, 0, cache.ResourceEventHandlerFuncs{}, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) return c }
|
NewConfigFactory就是往ConfigFactory中填充各字段。其中比较重要的是PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc)
,PodQueue中缓存着未被调度的Pod。
ConfigFactory可以生成Scheduler Config,有两种生成方式:第一种通过CreateFromProvider(),第二种通过CreateFromConfig(),这两个方法都定义在/plugin/pkg/scheduler/factory/factory.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| func (f *ConfigFactory) CreateFromProvider(providerName string) (*scheduler.Config, error) { glog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName) provider, err := GetAlgorithmProvider(providerName) if err != nil { return nil, err } return f.CreateFromKeys(provider.FitPredicateKeys, provider.PriorityFunctionKeys, []algorithm.SchedulerExtender{}) } func (f *ConfigFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler.Config, error) { glog.V(2).Infof("Creating scheduler from configuration: %v", policy) if err := validation.ValidatePolicy(policy); err != nil { return nil, err } predicateKeys := sets.NewString() for _, predicate := range policy.Predicates { glog.V(2).Infof("Registering predicate: %s", predicate.Name) predicateKeys.Insert(RegisterCustomFitPredicate(predicate)) } priorityKeys := sets.NewString() for _, priority := range policy.Priorities { glog.V(2).Infof("Registering priority: %s", priority.Name) priorityKeys.Insert(RegisterCustomPriorityFunction(priority)) } extenders := make([]algorithm.SchedulerExtender, 0) if len(policy.ExtenderConfigs) != 0 { for ii := range policy.ExtenderConfigs { glog.V(2).Infof("Creating extender with config %+v", policy.ExtenderConfigs[ii]) if extender, err := scheduler.NewHTTPExtender(&policy.ExtenderConfigs[ii], policy.APIVersion); err != nil { return nil, err } else { extenders = append(extenders, extender) } } } return f.CreateFromKeys(predicateKeys, priorityKeys, extenders) }
|
可以看出,CreateFromProvider()直接从algorithmProviderMap中获取对应的predicateKeys和priorityKeys;而CreateFromConfig()从配置文件中获取predicateKeys和priorityKeys,最后两个方法都调用了CreateFromKeys()。CreateFromKeys()同样定义在/plugin/pkg/scheduler/factory/factory.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) { glog.V(2).Infof("creating scheduler with fit predicates '%v' and priority functions '%v", predicateKeys, priorityKeys) if f.HardPodAffinitySymmetricWeight < 0 || f.HardPodAffinitySymmetricWeight > 100 { return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 0-100", f.HardPodAffinitySymmetricWeight) } predicateFuncs, err := f.GetPredicates(predicateKeys) if err != nil { return nil, err } priorityConfigs, err := f.GetPriorityFunctionConfigs(priorityKeys) if err != nil { return nil, err } priorityMetaProducer, err := f.GetPriorityMetadataProducer() if err != nil { return nil, err } predicateMetaProducer, err := f.GetPredicateMetadataProducer() if err != nil { return nil, err } f.Run() algo := scheduler.NewGenericScheduler(f.schedulerCache, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders) podBackoff := podBackoff{ perPodBackoff: map[types.NamespacedName]*backoffEntry{}, clock: realClock{}, defaultDuration: 1 * time.Second, maxDuration: 60 * time.Second, } return &scheduler.Config{ SchedulerCache: f.schedulerCache, NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()), Algorithm: algo, Binder: &binder{f.Client}, PodConditionUpdater: &podConditionUpdater{f.Client}, NextPod: func() *api.Pod { return f.getNextPod() }, Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue), StopEverything: f.StopEverything, }, nil }
|
在生成scheduler.Config的同时,CreateFromKeys()还调用了ConfigFactory的Run():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| func (f *ConfigFactory) Run() { cache.NewReflector(f.createUnassignedNonTerminatedPodLW(), &api.Pod{}, f.PodQueue, 0).RunUntil(f.StopEverything) go f.scheduledPodPopulator.Run(f.StopEverything) go f.nodePopulator.Run(f.StopEverything) go f.pvPopulator.Run(f.StopEverything) go f.pvcPopulator.Run(f.StopEverything) go f.servicePopulator.Run(f.StopEverything) go f.controllerPopulator.Run(f.StopEverything) f.informerFactory.Start(f.StopEverything) cache.NewReflector(f.createReplicaSetLW(), &extensions.ReplicaSet{}, f.ReplicaSetLister.Indexer, 0).RunUntil(f.StopEverything) }
|
来先看下PodQueue的生产者:
1
| cache.NewReflector(f.createUnassignedNonTerminatedPodLW(), &api.Pod{}, f.PodQueue, 0).RunUntil(f.StopEverything)
|
代码使用createUnassignedNonTerminatedpodLW()向PodQueue进行输入。createUnassignedNonTerminatedPodLW()定义在/plugin/pkg/scheduler/factory/factory.go中:
1 2 3 4 5
| func (factory *ConfigFactory) createUnassignedNonTerminatedPodLW() *cache.ListWatch { selector := fields.ParseSelectorOrDie("spec.nodeName==" + "" + ",status.phase!=" + string(api.PodSucceeded) + ",status.phase!=" + string(api.PodFailed)) return cache.NewListWatchFromClient(factory.Client.Core().RESTClient(), "pods", api.NamespaceAll, selector) }
|
可以看出,kube-scheduler会对spec.nodeName=””且status.phase!=”Succeeded”且status.phase!=”Failed”的Pod进行调度。
回到Run(),Run()接着会开启各Controller。
因为ConfigFactory中缓存着未被调度的Pods,那么获取一个未被调度的Pod就是件很容易的事情了:
1 2 3 4 5 6 7 8 9 10
| func (f *ConfigFactory) getNextPod() *api.Pod { for { pod := cache.Pop(f.PodQueue).(*api.Pod) if f.responsibleForPod(pod) { glog.V(4).Infof("About to try and schedule pod %v", pod.Name) return pod } } }
|
getNextPod()直接从PodQueue中获取一个Pod。
Scheduler Config
Scheduler的Config定义在/plugin/pkg/scheduler/scheduler.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| type Config struct { SchedulerCache schedulercache.Cache NodeLister algorithm.NodeLister Algorithm algorithm.ScheduleAlgorithm Binder Binder PodConditionUpdater PodConditionUpdater NextPod func() *api.Pod // Error is called if there is an error. It is passed the pod in // question, and the error Error func(*api.Pod, error) // Recorder is the EventRecorder to use Recorder record.EventRecorder // Close this to shut down the scheduler. StopEverything chan struct{} }
|
可以从Scheduler Config生成一个scheduler:
1 2 3 4 5 6 7 8
| func New(c *Config) *Scheduler { s := &Scheduler{ config: c, } metrics.Register() return s }
|
Scheduler
Scheduler定义在/plugin/pkg/scheduler/scheduler.go中:
1 2 3 4 5
| type Scheduler struct { config *Config }
|
Scheduler本质就是一个Config。Scheduler定义了Run()方法用于启动scheduler,定义在/plugin/pkg/scheduler/scheduler.go中:
1 2 3
| func (s *Scheduler) Run() { go wait.Until(s.scheduleOne, 0, s.config.StopEverything) }
|
可以看出,Run()以goroutine的形式启动schedulerOne()。schedulerOne()定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| func (s *Scheduler) scheduleOne() { pod := s.config.NextPod() glog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name) start := time.Now() dest, err := s.config.Algorithm.Schedule(pod, s.config.NodeLister) if err != nil { glog.V(1).Infof("Failed to schedule pod: %v/%v", pod.Namespace, pod.Name) s.config.Error(pod, err) s.config.Recorder.Eventf(pod, api.EventTypeWarning, "FailedScheduling", "%v", err) s.config.PodConditionUpdater.Update(pod, &api.PodCondition{ Type: api.PodScheduled, Status: api.ConditionFalse, Reason: api.PodReasonUnschedulable, }) return } metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start)) assumed := *pod assumed.Spec.NodeName = dest if err := s.config.SchedulerCache.AssumePod(&assumed); err != nil { glog.Errorf("scheduler cache AssumePod failed: %v", err) return } go func() { defer metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start)) b := &api.Binding{ ObjectMeta: api.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name}, Target: api.ObjectReference{ Kind: "Node", Name: dest, }, } bindingStart := time.Now() err := s.config.Binder.Bind(b) if err != nil { glog.V(1).Infof("Failed to bind pod: %v/%v", pod.Namespace, pod.Name) if err := s.config.SchedulerCache.ForgetPod(&assumed); err != nil { glog.Errorf("scheduler cache ForgetPod failed: %v", err) } s.config.Error(pod, err) s.config.Recorder.Eventf(pod, api.EventTypeNormal, "FailedScheduling", "Binding rejected: %v", err) s.config.PodConditionUpdater.Update(pod, &api.PodCondition{ Type: api.PodScheduled, Status: api.ConditionFalse, Reason: "BindingRejected", }) return } metrics.BindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart)) s.config.Recorder.Eventf(pod, api.EventTypeNormal, "Scheduled", "Successfully assigned %v to %v", pod.Name, dest) }() }
|
scheduleOne()先获取一个未被调用的Pod,然后GenericScheduler的Schedule()来获取一个合适的节点,最后生成Binding,并写入Apiserver。
GenericScheduler
一般情况,Scheduler中的Algorithm为GenericScheduler。GenericScheduler定义有Schedule()方法选择合适的节点:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeLister) (string, error) { var trace *util.Trace if pod != nil { trace = util.NewTrace(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name)) } else { trace = util.NewTrace("Scheduling <nil> pod") } defer trace.LogIfLong(100 * time.Millisecond) nodes, err := nodeLister.List() if err != nil { return "", err } if len(nodes) == 0 { return "", ErrNoNodesAvailable } err = g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap) if err != nil { return "", err } trace.Step("Computing predicates") filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer) if err != nil { return "", err } if len(filteredNodes) == 0 { return "", &FitError{ Pod: pod, FailedPredicates: failedPredicateMap, } } trace.Step("Prioritizing") metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap) priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders) if err != nil { return "", err } trace.Step("Selecting host") return g.selectHost(priorityList) }
|
可以看出Schedule()方法的流程如下:
- 获取节点列表;
- 通过findNodesThatFit()找到符合条件的节点;
- 通过PrioritizeNodes()计算节点的分数;
- 通过selectHost()选择分数最高的节点。
findNodesThatFit()
先来看findNodesThatFit():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| func findNodesThatFit( pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node, predicateFuncs map[string]algorithm.FitPredicate, extenders []algorithm.SchedulerExtender, metadataProducer algorithm.MetadataProducer, ) ([]*api.Node, FailedPredicateMap, error) { var filtered []*api.Node failedPredicateMap := FailedPredicateMap{} if len(predicateFuncs) == 0 { filtered = nodes } else { filtered = make([]*api.Node, len(nodes)) errs := []error{} var predicateResultLock sync.Mutex var filteredLen int32 meta := metadataProducer(pod, nodeNameToInfo) checkNode := func(i int) { nodeName := nodes[i].Name fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs) if err != nil { predicateResultLock.Lock() errs = append(errs, err) predicateResultLock.Unlock() return } if fits { filtered[atomic.AddInt32(&filteredLen, 1)-1] = nodes[i] } else { predicateResultLock.Lock() failedPredicateMap[nodeName] = failedPredicates predicateResultLock.Unlock() } } workqueue.Parallelize(16, len(nodes), checkNode) filtered = filtered[:filteredLen] if len(errs) > 0 { return []*api.Node{}, FailedPredicateMap{}, errors.NewAggregate(errs) } } if len(filtered) > 0 && len(extenders) != 0 { for _, extender := range extenders { filteredList, failedMap, err := extender.Filter(pod, filtered) if err != nil { return []*api.Node{}, FailedPredicateMap{}, err } for failedNodeName, failedMsg := range failedMap { if _, found := failedPredicateMap[failedNodeName]; !found { failedPredicateMap[failedNodeName] = []algorithm.PredicateFailureReason{} } failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg)) } filtered = filteredList if len(filtered) == 0 { break } } } return filtered, failedPredicateMap, nil }
|
findNodesThatFit()会启动多个routine调用checkNode()并行对节点进行检查。其中checkNode()主要调用了podFitsOnNode():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func podFitsOnNode(pod *api.Pod, meta interface{}, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate) (bool, []algorithm.PredicateFailureReason, error) { var failedPredicates []algorithm.PredicateFailureReason for _, predicate := range predicateFuncs { fit, reasons, err := predicate(pod, meta, info) if err != nil { err := fmt.Errorf("SchedulerPredicates failed due to %v, which is unexpected.", err) return false, []algorithm.PredicateFailureReason{}, err } if !fit { failedPredicates = append(failedPredicates, reasons...) } } return len(failedPredicates) == 0, failedPredicates, nil }
|
podFitsOnNode()会调用每个predicateFunc来对node进行检查,如果全部通过,则该node符合要求。
PrioritizeNodes()
再来看PrioritizeNodes():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
| func PrioritizeNodes( pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, meta interface{}, priorityConfigs []algorithm.PriorityConfig, nodes []*api.Node, extenders []algorithm.SchedulerExtender, ) (schedulerapi.HostPriorityList, error) { if len(priorityConfigs) == 0 && len(extenders) == 0 { result := make(schedulerapi.HostPriorityList, 0, len(nodes)) for i := range nodes { hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name]) if err != nil { return nil, err } result = append(result, hostPriority) } return result, nil } var ( mu = sync.Mutex{} wg = sync.WaitGroup{} errs []error ) appendError := func(err error) { mu.Lock() defer mu.Unlock() errs = append(errs, err) } results := make([]schedulerapi.HostPriorityList, 0, len(priorityConfigs)) for range priorityConfigs { results = append(results, nil) } for i, priorityConfig := range priorityConfigs { if priorityConfig.Function != nil { wg.Add(1) go func(index int, config algorithm.PriorityConfig) { defer wg.Done() var err error results[index], err = config.Function(pod, nodeNameToInfo, nodes) if err != nil { appendError(err) } }(i, priorityConfig) } else { results[i] = make(schedulerapi.HostPriorityList, len(nodes)) } } processNode := func(index int) { nodeInfo := nodeNameToInfo[nodes[index].Name] var err error for i := range priorityConfigs { if priorityConfigs[i].Function != nil { continue } results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo) if err != nil { appendError(err) return } } } workqueue.Parallelize(16, len(nodes), processNode) for i, priorityConfig := range priorityConfigs { if priorityConfig.Reduce == nil { continue } wg.Add(1) go func(index int, config algorithm.PriorityConfig) { defer wg.Done() if err := config.Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil { appendError(err) } }(i, priorityConfig) } wg.Wait() if len(errs) != 0 { return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs) } result := make(schedulerapi.HostPriorityList, 0, len(nodes)) for i := range nodes { result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0}) for j := range priorityConfigs { result[i].Score += results[j][i].Score * priorityConfigs[j].Weight } } if len(extenders) != 0 && nodes != nil { combinedScores := make(map[string]int, len(nodeNameToInfo)) for _, extender := range extenders { wg.Add(1) go func(ext algorithm.SchedulerExtender) { defer wg.Done() prioritizedList, weight, err := ext.Prioritize(pod, nodes) if err != nil { return } mu.Lock() for i := range *prioritizedList { host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score combinedScores[host] += score * weight } mu.Unlock() }(extender) } wg.Wait() for i := range result { result[i].Score += combinedScores[result[i].Host] } } if glog.V(10) { for i := range result { glog.V(10).Infof("Host %s => Score %d", result[i].Host, result[i].Score) } } return result, nil }
|
kube-scheduler的先使用PriorityFunction计算节点的分数,在向priorityFunctionMap注册PriorityFunction时,会指定该PriorityFunction对应的weight,然后再累加每个PriorityFunction和weight相乘的积,这就样就得到了这个节点的分数。
selectHost()
最后来看下selectHost():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList) (string, error) { if len(priorityList) == 0 { return "", fmt.Errorf("empty priorityList") } sort.Sort(sort.Reverse(priorityList)) maxScore := priorityList[0].Score firstAfterMaxScore := sort.Search(len(priorityList), func(i int) bool { return priorityList[i].Score < maxScore }) g.lastNodeIndexLock.Lock() ix := int(g.lastNodeIndex % uint64(firstAfterMaxScore)) g.lastNodeIndex++ g.lastNodeIndexLock.Unlock() return priorityList[ix].Host, nil }
|
selectHost()可以选出分数最高的节点,如果分数最高的节点有多个,则根据最高分节点的个数进行round-robin选择。