kubelet在Kubernetes中负责具体与物理设备打交道,负责pod所对应的相关资源的管理,如container, volume, network等。本次分析将介绍kubelet的config,看kubelet是如何把pod的信息从apiserver,或file,或http同步到kublet中的。其中,apiserver渠道代表从ETCD中获取pod的变化;file渠道代表从文件获取源的变化,如static pod;http渠道我还没用过。
Mux
关于Mux,在”kube-proxy分析(一)-config-v1.5.2”中已介绍过。现直接拷贝过来。
Mux可以创建多条channel,并与相关config的storage配合把这些channel中的数据进行合并。Mux定义在/pkg/util/config/config.go中:
1 2 3 4 5 6 7 8 9 10 11
| type Mux struct { merger Merger sourceLock sync.RWMutex sources map[string]chan interface{} }
|
其中,Merger负责把数据进行合并;sources中记录了名称及对应的channel。
可以使用NewMux()函数生成Mux:
1 2 3 4 5 6 7 8
| func NewMux(merger Merger) *Mux { mux := &Mux{ sources: make(map[string]chan interface{}), merger: merger, } return mux }
|
在创建Mux时需要指定Merger,而sources中的channel目前为空。那么,如何建立source channel,又如何把source channel中的内容交Merger处理呢?来看Mux的Channel()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| func (m *Mux) Channel(source string) chan interface{} { if len(source) == 0 { panic("Channel given an empty name") } m.sourceLock.Lock() defer m.sourceLock.Unlock() channel, exists := m.sources[source] if exists { return channel } newChannel := make(chan interface{}) m.sources[source] = newChannel go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop) return newChannel } func (m *Mux) listen(source string, listenChannel <-chan interface{}) { for update := range listenChannel { m.merger.Merge(source, update) } }
|
Channel()方法会创建一个channel,并启动一个routine调用listen(),最后把channel返回。listen()就是监听刚创建的channel,然后通过merger.Merge()处理数据。所以,通过调用Mux的Channel()方法,我们可以获取到一个channel,我们只要往这个channel中写数据即可,Mux自动会把数据交给merger处理。
再来看下什么是Merger,Merger概念定义在/pkg/util/config/config.go中:
1 2 3 4 5 6
| type Merger interface { Merge(source string, update interface{}) error }
|
可以看出,只要实现了Merge()方法的结构体都可以称为Merger,很简洁。
podStorage
podStorage是对多个渠道过来的数据的一个汇总的存储场所,即可以理解为是一个Merger,定义在/pkg/kubelet/config/config.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| type podStorage struct { podLock sync.RWMutex pods map[string]map[string]*api.Pod mode PodConfigNotificationMode updateLock sync.Mutex updates chan<- kubetypes.PodUpdate sourcesSeenLock sync.RWMutex sourcesSeen sets.String recorder record.EventRecorder }
|
可以看到,podStorage有一个updates,作为所有的整理好的数据集中地;还有pods字段用来存储分类好的pods。所以podStorage的主要工作流程为:消费Mux返回的channel中的数据,调用Merge()对数据进行处理,最后把整理好的数据发送到updates中。
先来看podStorage的Merge():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| func (s *podStorage) Merge(source string, change interface{}) error { s.updateLock.Lock() defer s.updateLock.Unlock() seenBefore := s.sourcesSeen.Has(source) adds, updates, deletes, removes, reconciles := s.merge(source, change) firstSet := !seenBefore && s.sourcesSeen.Has(source) switch s.mode { case PodConfigNotificationIncremental: if len(removes.Pods) > 0 { s.updates <- *removes } if len(adds.Pods) > 0 { s.updates <- *adds } if len(updates.Pods) > 0 { s.updates <- *updates } if len(deletes.Pods) > 0 { s.updates <- *deletes } if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 { s.updates <- *adds } if len(reconciles.Pods) > 0 { s.updates <- *reconciles } ...... default: panic(fmt.Sprintf("unsupported PodConfigNotificationMode: %#v", s.mode)) } return nil }
|
一般来说,podStorage的mode为PodConfigNotificationIncremental。可以看出,Merge()调用merge()把指定source的数据进行整理,把数据分类到adds, updates, deletes, removes, reconciles等channel中。
merge()定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
| func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) { s.podLock.Lock() defer s.podLock.Unlock() addPods := []*api.Pod{} updatePods := []*api.Pod{} deletePods := []*api.Pod{} removePods := []*api.Pod{} reconcilePods := []*api.Pod{} pods := s.pods[source] if pods == nil { pods = make(map[string]*api.Pod) } updatePodsFunc := func(newPods []*api.Pod, oldPods, pods map[string]*api.Pod) { filtered := filterInvalidPods(newPods, source, s.recorder) for _, ref := range filtered { name := kubecontainer.GetPodFullName(ref) if ref.Annotations == nil { ref.Annotations = make(map[string]string) } ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source if existing, found := oldPods[name]; found { pods[name] = existing needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref) if needUpdate { updatePods = append(updatePods, existing) } else if needReconcile { reconcilePods = append(reconcilePods, existing) } else if needGracefulDelete { deletePods = append(deletePods, existing) } continue } recordFirstSeenTime(ref) pods[name] = ref addPods = append(addPods, ref) } } update := change.(kubetypes.PodUpdate) switch update.Op { case kubetypes.ADD, kubetypes.UPDATE, kubetypes.DELETE: if update.Op == kubetypes.ADD { glog.V(4).Infof("Adding new pods from source %s : %v", source, update.Pods) } else if update.Op == kubetypes.DELETE { glog.V(4).Infof("Graceful deleting pods from source %s : %v", source, update.Pods) } else { glog.V(4).Infof("Updating pods from source %s : %v", source, update.Pods) } updatePodsFunc(update.Pods, pods, pods) case kubetypes.REMOVE: glog.V(4).Infof("Removing pods from source %s : %v", source, update.Pods) for _, value := range update.Pods { name := kubecontainer.GetPodFullName(value) if existing, found := pods[name]; found { delete(pods, name) removePods = append(removePods, existing) continue } } case kubetypes.SET: glog.V(4).Infof("Setting pods for source %s", source) s.markSourceSet(source) oldPods := pods pods = make(map[string]*api.Pod) updatePodsFunc(update.Pods, oldPods, pods) for name, existing := range oldPods { if _, found := pods[name]; !found { removePods = append(removePods, existing) } } default: glog.Warningf("Received invalid update type: %v", update) } s.pods[source] = pods adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source} updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source} deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source} removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source} reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source} return adds, updates, deletes, removes, reconciles } func checkAndUpdatePod(existing, ref *api.Pod) (needUpdate, needReconcile, needGracefulDelete bool) { if !podsDifferSemantically(existing, ref) { if !reflect.DeepEqual(existing.Status, ref.Status) { existing.Status = ref.Status needReconcile = true } return } ref.Annotations[kubetypes.ConfigFirstSeenAnnotationKey] = existing.Annotations[kubetypes.ConfigFirstSeenAnnotationKey] existing.Spec = ref.Spec existing.Labels = ref.Labels existing.DeletionTimestamp = ref.DeletionTimestamp existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds existing.Status = ref.Status updateAnnotations(existing, ref) if ref.DeletionTimestamp != nil { needGracefulDelete = true } else { needUpdate = true } return }
|
merge()消费的渠道中的数据的update.Op一般为kubetypes.SET,由updatePodsFunc完成分类。
根据merge()和checkAndUpdatePod(),我们可以小结下各channel的逻辑:
- adds:表示kubelet需要新增的pods,其内的数据动作为kubetypes.ADD;
- updates:表示kubelet需要更新的pods,其内的数据动作为kubetypes.UPDATE;
- deletes:表示kubelet需要”删除”的pods,这些pods的 DeletionTimestamp被标记,其内的数据动作为kubetypes.DELETE;
- removes:表示kubelet需要删除的pods,这些pods在kubelet侧有,但对应的渠道侧没有,其内的数据动作为kubetypes.REMOVE;
- reconciles: 表示kubelet需要更新status的pods,其内的数据动作为kubetypes.RECONCILE。
这里要注意下,每次update中都存储有全量的pods。updatePodsFunc会对全量的pods进行归类,然后在类别上加入OP标签以标识这些pods需要的操作。
PodConfig
PodConfig是负责管理整个pod同步,定义在/pkg/kubelet/config/config.go:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| type PodConfig struct { pods *podStorage mux *config.Mux updates chan kubetypes.PodUpdate sourcesLock sync.Mutex sources sets.String }
|
PodConfig主要有如下字段:
- pods: 是一个podStorage,用来汇总数据;
- mux: 为Mux;
- updates:即pods podStorage中的updates字段,所有处理过的事件都会汇总到这供外部的消费者消费;
- sources:记录config中的渠道名称。
PodConfig实现有Channel()方法:
1 2 3 4 5 6 7 8 9
| func (c *PodConfig) Channel(source string) chan<- interface{} { c.sourcesLock.Lock() defer c.sourcesLock.Unlock() c.sources.Insert(source) return c.mux.Channel(source) }
|
Channel()方法会调用mux的Channel(),启动整个merge流程。
那么,外部如何获取存储有处理好的汇总数据集中地updates channel呢?所以,PodConfig还实现了Updates():
1 2 3 4
| func (c *PodConfig) Updates() <-chan kubetypes.PodUpdate { return c.updates }
|
Updates()很简单,直接返回updates channel。
apiserver渠道
以apiserver渠道渠道为例来看下渠道,即source的实现,定义在/pkg/kubelet/config/apiserver.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| func NewSourceApiserver(c *clientset.Clientset, nodeName types.NodeName, updates chan<- interface{}) { lw := cache.NewListWatchFromClient(c.Core().RESTClient(), "pods", api.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName))) newSourceApiserverFromLW(lw, updates) } func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) { send := func(objs []interface{}) { var pods []*api.Pod for _, o := range objs { pods = append(pods, o.(*api.Pod)) } updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource} } cache.NewReflector(lw, &api.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0).Run() }
|
NewSourceApiserver()会通过listwatch机制把变化发送到参数中的channel,即mux生成的channel,也是merger需要处理数据所在的channel。
调用
/pkg/kubelet/kubelet.go的makePodSourceConfig()中会对pod source config进行设置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| func makePodSourceConfig(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, nodeName types.NodeName) (*config.PodConfig, error) { manifestURLHeader := make(http.Header) if kubeCfg.ManifestURLHeader != "" { pieces := strings.Split(kubeCfg.ManifestURLHeader, ":") if len(pieces) != 2 { return nil, fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", kubeCfg.ManifestURLHeader) } manifestURLHeader.Set(pieces[0], pieces[1]) } cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder) if kubeCfg.PodManifestPath != "" { glog.Infof("Adding manifest file: %v", kubeCfg.PodManifestPath) config.NewSourceFile(kubeCfg.PodManifestPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource)) } if kubeCfg.ManifestURL != "" { glog.Infof("Adding manifest url %q with HTTP header %v", kubeCfg.ManifestURL, manifestURLHeader) config.NewSourceURL(kubeCfg.ManifestURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource)) } if kubeDeps.KubeClient != nil { glog.Infof("Watching apiserver") glog.Infof("Watching apiserver") config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, cfg.Channel(kubetypes.ApiserverSource)) } return cfg, nil }
|
可见,makePodSourceConfig()会生成一个新的config,然后调用NewSourceFile(), NewSourceURL(), NewSourceApiserver()生成三个渠道。三个渠道的数据都会放入属于自己的channel中,而该channel是通过PodConfig的Channel()生成的。现在整个PodConfig就能汇总加工3个渠道的数据了。外部只要调用PocConfig的Updates()即可获取这些处理好的数据所有的channel,相当方便。