上次分析了所有pod事件都会被处理后放置到updates channel中,即介绍了updates数据的生产者,本次分析将介绍updates数据的消费者。
syncLoopIteration
在/pkg/kbuelet/kubelet.go中,有:
1 2 3 4 5
| func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { ...... kl.syncLoop(updates, kl) }
|
syncLoop()就会消费updates中的数据,定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) { glog.Info("Starting kubelet main sync loop.") syncTicker := time.NewTicker(time.Second) defer syncTicker.Stop() housekeepingTicker := time.NewTicker(housekeepingPeriod) defer housekeepingTicker.Stop() plegCh := kl.pleg.Watch() for { if rs := kl.runtimeState.runtimeErrors(); len(rs) != 0 { glog.Infof("skipping pod synchronization - %v", rs) time.Sleep(5 * time.Second) continue } if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) { break } } }
|
syncLoop()会调用syncLoopIteration(),定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler, syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool { kl.syncLoopMonitor.Store(kl.clock.Now()) select { case u, open := <-configCh: if !open { glog.Errorf("Update channel is closed. Exiting the sync loop.") return false } switch u.Op { case kubetypes.ADD: glog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods)) handler.HandlePodAdditions(u.Pods) case kubetypes.UPDATE: glog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletiontimestamps(u.Pods)) handler.HandlePodUpdates(u.Pods) case kubetypes.REMOVE: glog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods)) handler.HandlePodRemoves(u.Pods) case kubetypes.RECONCILE: glog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods)) handler.HandlePodReconcile(u.Pods) case kubetypes.DELETE: glog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods)) handler.HandlePodUpdates(u.Pods) case kubetypes.SET: glog.Errorf("Kubelet does not support snapshot update") } kl.sourcesReady.AddSource(u.Source) ...... return true }
|
syncLoopIteration()会监听很多条channel,我们关注的是configCh这个分支。可以看到,事件类型与处理函数的对应的关系如下:
- kubetypes.ADD: HandlePodAdditions()
- kubetypes.UPDATE: HandlePodUpdates()
- kubetypes.REMOVE: HandlePodRemoves()
- kubetypes.RECONCILE: HandlePodReconcile()
- kubetypes.DELETE: HandlePodUpdates()
- kubetypes.SET: 目前还未支持。
由于podConfig一次分类处理的是全量的pods,所以在各handler一次也要处理多个pods,这样大大提高了效率。
handler
HandlePodAdditions()定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| func (kl *Kubelet) HandlePodAdditions(pods []*api.Pod) { start := kl.clock.Now() var criticalPods []*api.Pod var nonCriticalPods []*api.Pod for _, p := range pods { if kubetypes.IsCriticalPod(p) { criticalPods = append(criticalPods, p) } else { nonCriticalPods = append(nonCriticalPods, p) } } sort.Sort(sliceutils.PodsByCreationTime(criticalPods)) sort.Sort(sliceutils.PodsByCreationTime(nonCriticalPods)) for _, pod := range append(criticalPods, nonCriticalPods...) { existingPods := kl.podManager.GetPods() kl.podManager.AddPod(pod) if kubepod.IsMirrorPod(pod) { kl.handleMirrorPod(pod, start) continue } if !kl.podIsTerminated(pod) { activePods := kl.filterOutTerminatedPods(existingPods) if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok { kl.rejectPod(pod, reason, message) continue } } mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start) kl.probeManager.AddPod(pod) } }
|
HandlePodAdditions()会把pods分为criticalPods(还不清楚具体为具体功能)和nonCriticalPods,然后调用dispatchWork()处理各pod,设定处理方法为kubetypes.SyncPodCreate。
HandlePodUpdates()定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func (kl *Kubelet) HandlePodUpdates(pods []*api.Pod) { start := kl.clock.Now() for _, pod := range pods { kl.podManager.UpdatePod(pod) if kubepod.IsMirrorPod(pod) { kl.handleMirrorPod(pod, start) continue } mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start) } }
|
HandlePodUpdates()会调用dispatchWork()处理各pods,设定处理方法为kubetypes.SyncPodUpdate。
HandlePodRemoves()定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| func (kl *Kubelet) HandlePodRemoves(pods []*api.Pod) { start := kl.clock.Now() for _, pod := range pods { kl.podManager.DeletePod(pod) if kubepod.IsMirrorPod(pod) { kl.handleMirrorPod(pod, start) continue } if err := kl.deletePod(pod); err != nil { glog.V(2).Infof("Failed to delete pod %q, err: %v", format.Pod(pod), err) } kl.probeManager.RemovePod(pod) } }
|
HandlePodRemoves()直接调用deletePod()删除pod。
HandlePodReconcile()定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func (kl *Kubelet) HandlePodReconcile(pods []*api.Pod) { for _, pod := range pods { kl.podManager.UpdatePod(pod) if eviction.PodIsEvicted(pod.Status) { if podStatus, err := kl.podCache.Get(pod.UID); err == nil { kl.containerDeletor.deleteContainersInPod("", podStatus, true) } } } }
|
HandlePodReconcile()会把pod状态更新到podManager中,然后statusManager会根据podManager的状态来作出处理。
再来看下dispatchWork():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType kubetypes.SyncPodType, mirrorPod *api.Pod, start time.Time) { if kl.podIsTerminated(pod) { if pod.DeletionTimestamp != nil { kl.statusManager.TerminatePod(pod) } return } kl.podWorkers.UpdatePod(&UpdatePodOptions{ Pod: pod, MirrorPod: mirrorPod, UpdateType: syncType, OnCompleteFunc: func(err error) { if err != nil { metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start)) } }, }) if syncType == kubetypes.SyncPodCreate { metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers))) } }
|
可见,dispatchWork()调用了podWorkers的UpdatePod()方法处理pod。
podWorkers
podWorkers会为每一个pod都维护有一个workUpdate channel,对每一个workUpdate channel,都有一个managePodLoop()对该channel中的内容进行处理。podWorkers定义在/pkg/kubelet/pod_workers.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| type podWorkers struct { podLock sync.Mutex podUpdates map[types.UID]chan UpdatePodOptions isWorking map[types.UID]bool lastUndeliveredWorkUpdate map[types.UID]UpdatePodOptions workQueue queue.WorkQueue syncPodFn syncPodFnType recorder record.EventRecorder backOffPeriod time.Duration resyncInterval time.Duration podCache kubecontainer.Cache }
|
podWorkers的主要字段如下:
- podUpdates:UpdatePodOptions(即后面函数的podUpdates) channel的存储地,key为pod的UID;
- syncPodFn: pod的处理函数。
可以使用newPodWorkers()生成podWorkers:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func newPodWorkers(syncPodFn syncPodFnType, recorder record.EventRecorder, workQueue queue.WorkQueue, resyncInterval, backOffPeriod time.Duration, podCache kubecontainer.Cache) *podWorkers { return &podWorkers{ podUpdates: map[types.UID]chan UpdatePodOptions{}, isWorking: map[types.UID]bool{}, lastUndeliveredWorkUpdate: map[types.UID]UpdatePodOptions{}, syncPodFn: syncPodFn, recorder: recorder, workQueue: workQueue, resyncInterval: resyncInterval, backOffPeriod: backOffPeriod, podCache: podCache, } }
|
再来看UpdatePod():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| func (p *podWorkers) UpdatePod(options *UpdatePodOptions) { pod := options.Pod uid := pod.UID var podUpdates chan UpdatePodOptions var exists bool p.podLock.Lock() defer p.podLock.Unlock() if podUpdates, exists = p.podUpdates[uid]; !exists { podUpdates = make(chan UpdatePodOptions, 1) p.podUpdates[uid] = podUpdates go func() { defer runtime.HandleCrash() p.managePodLoop(podUpdates) }() } if !p.isWorking[pod.UID] { p.isWorking[pod.UID] = true podUpdates <- *options } else { update, found := p.lastUndeliveredWorkUpdate[pod.UID] if !found || update.UpdateType != kubetypes.SyncPodKill { p.lastUndeliveredWorkUpdate[pod.UID] = *options } } }
|
UpdatePod()的流程如下:
- 如果pod的uid在podUpdates中不存在,则新建podUpdates,然后启动managePodLoop()处理该podUpdates;
- 把options加入到podUpdates中。
来看下managePodLoop()是如何处理podUpdates中的数据的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) { var lastSyncTime time.Time for update := range podUpdates { err := func() error { podUID := update.Pod.UID status, err := p.podCache.GetNewerThan(podUID, lastSyncTime) if err != nil { return err } err = p.syncPodFn(syncPodOptions{ mirrorPod: update.MirrorPod, pod: update.Pod, podStatus: status, killPodOptions: update.KillPodOptions, updateType: update.UpdateType, }) lastSyncTime = time.Now() if err != nil { return err } return nil }() if update.OnCompleteFunc != nil { update.OnCompleteFunc(err) } if err != nil { glog.Errorf("Error syncing pod %s, skipping: %v", update.Pod.UID, err) p.recorder.Eventf(update.Pod, api.EventTypeWarning, events.FailedSync, "Error syncing pod, skipping: %v", err) } p.wrapUp(update.Pod.UID, err) } }
|
managePodLoop()会取出podUpdates中的内容,然后调用syncPodFn()处理内容。
syncPodFn()
syncPodFn()这里就是/pkg/kubelet/kubelet.go中的syncPod():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
| func (kl *Kubelet) syncPod(o syncPodOptions) error { pod := o.pod mirrorPod := o.mirrorPod podStatus := o.podStatus updateType := o.updateType if updateType == kubetypes.SyncPodKill { killPodOptions := o.killPodOptions if killPodOptions == nil || killPodOptions.PodStatusFunc == nil { return fmt.Errorf("kill pod options are required if update type is kill") } apiPodStatus := killPodOptions.PodStatusFunc(pod, podStatus) kl.statusManager.SetPodStatus(pod, apiPodStatus) if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil { utilruntime.HandleError(err) return err } return nil } var firstSeenTime time.Time if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok { firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get() } if updateType == kubetypes.SyncPodCreate { if !firstSeenTime.IsZero() { metrics.PodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime)) } else { glog.V(3).Infof("First seen time not recorded for pod %q", pod.UID) } } apiPodStatus := kl.generateAPIPodStatus(pod, podStatus) podStatus.IP = apiPodStatus.PodIP existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID) if !ok || existingStatus.Phase == api.PodPending && apiPodStatus.Phase == api.PodRunning && !firstSeenTime.IsZero() { metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime)) } runnable := kl.canRunPod(pod) if !runnable.Admit { apiPodStatus.Reason = runnable.Reason apiPodStatus.Message = runnable.Message const waitingReason = "Blocked" for _, cs := range apiPodStatus.InitContainerStatuses { if cs.State.Waiting != nil { cs.State.Waiting.Reason = waitingReason } } for _, cs := range apiPodStatus.ContainerStatuses { if cs.State.Waiting != nil { cs.State.Waiting.Reason = waitingReason } } } kl.statusManager.SetPodStatus(pod, apiPodStatus) if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == api.PodFailed { var syncErr error if err := kl.killPod(pod, nil, podStatus, nil); err != nil { syncErr = fmt.Errorf("error killing pod: %v", err) utilruntime.HandleError(syncErr) } else { if !runnable.Admit { syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message) } } return syncErr } if rs := kl.runtimeState.networkErrors(); len(rs) != 0 && !podUsesHostNetwork(pod) { return fmt.Errorf("network is not ready: %v", rs) } pcm := kl.containerManager.NewPodContainerManager() if !kl.podIsTerminated(pod) { firstSync := true for _, containerStatus := range apiPodStatus.ContainerStatuses { if containerStatus.State.Running != nil { firstSync = false break } } podKilled := false if !pcm.Exists(pod) && !firstSync { kl.killPod(pod, nil, podStatus, nil) podKilled = true } if !(podKilled && pod.Spec.RestartPolicy == api.RestartPolicyNever) { if err := pcm.EnsureExists(pod); err != nil { return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err) } } } if kubepod.IsStaticPod(pod) { podFullName := kubecontainer.GetPodFullName(pod) deleted := false if mirrorPod != nil { if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) { glog.Warningf("Deleting mirror pod %q because it is outdated", format.Pod(mirrorPod)) if err := kl.podManager.DeleteMirrorPod(podFullName); err != nil { glog.Errorf("Failed deleting mirror pod %q: %v", format.Pod(mirrorPod), err) } else { deleted = true } } } if mirrorPod == nil || deleted { glog.V(3).Infof("Creating a mirror pod for static pod %q", format.Pod(pod)) if err := kl.podManager.CreateMirrorPod(pod); err != nil { glog.Errorf("Failed creating a mirror pod for %q: %v", format.Pod(pod), err) } } } if err := kl.makePodDataDirs(pod); err != nil { glog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err) return err } if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil { kl.recorder.Eventf(pod, api.EventTypeWarning, events.FailedMountVolume, "Unable to mount volumes for pod %q: %v", format.Pod(pod), err) glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err) return err } pullSecrets, err := kl.getPullSecretsForPod(pod) if err != nil { glog.Errorf("Unable to get pull secrets for pod %q: %v", format.Pod(pod), err) return err } result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff) kl.reasonCache.Update(pod.UID, result) if err = result.Error(); err != nil { return err } if !kl.shapingEnabled() { return nil } ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations) if err != nil { return err } if egress != nil || ingress != nil { if podUsesHostNetwork(pod) { kl.recorder.Event(pod, api.EventTypeWarning, events.HostNetworkNotSupported, "Bandwidth shaping is not currently supported on the host network") } else if kl.shaper != nil { if len(apiPodStatus.PodIP) > 0 { err = kl.shaper.ReconcileCIDR(fmt.Sprintf("%s/32", apiPodStatus.PodIP), egress, ingress) } } else { kl.recorder.Event(pod, api.EventTypeWarning, events.UndefinedShaper, "Pod requests bandwidth shaping, but the shaper is undefined") } } return nil }
|
syncPod就整个kubelet的核心,pod的创建与删除都由它来处理。由于syncPod()涉及到太多的kubelet中功能包,所以等volumeManager, podManager等介绍完毕再详细分析。现在只需知道syncPod()会处理传进来的pod,维护物理机上的pod和etcd中的pod的一致性。
podWorker的管理
最后让我们来看下kubelet是如何管理podWoker的。
在/pkg/kubelet/kubelet.go的NewMainKubelet()函数中,有:
1 2
| klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
|
可以看到,传给newPodWorkers()的syncPodFn为syncPod。