podStatus
一个pod的status如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| status: conditions: - lastProbeTime: null lastTransitionTime: 2017-12-07T01:25:11Z status: "True" type: Initialized - lastProbeTime: null lastTransitionTime: 2017-12-17T09:13:41Z status: "True" type: Ready - lastProbeTime: null lastTransitionTime: 2017-12-07T01:25:11Z status: "True" type: PodScheduled containerStatuses: - containerID: docker://397a1f998a38c6549c73df6220fa672eb45e1b8fa317e07a1ca428ed8b108c34 image: ubuntu-ssh:v1 imageID: docker://sha256:27312dad4272a61739c317b90bfab5bc129e38696df5c606b1041199caf03ea4 lastState: {} name: ubuntu ready: true restartCount: 0 state: running: startedAt: 2017-12-17T09:13:40Z
|
pod的status可以表示pod的状态及pod下的容器的状态。
statusManager
statusManager主要用来管理pod的status,定义在/pkg/kubelet/status/status_manager.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| type manager struct { kubeClient clientset.Interface podManager kubepod.Manager podStatuses map[types.UID]versionedPodStatus podStatusesLock sync.RWMutex podStatusChannel chan podStatusSyncRequest apiStatusVersions map[types.UID]uint64 }
|
manager的主要字段含义如下:
- kubeClient: k8s clientset,用于和apiserver交互;
- podManager: 负责内存中pod的维护;
- podStatuses: 存储pod及状态的map关系;
- podStatusesChannel: statusManager对外暴露的channel;
- apiStatusVersions: 维护更新完的pod的status更新号,每更新一次会加1。
manager(即statusManager)可以使用Start()方法启动:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| func (m *manager) Start() { if m.kubeClient == nil { glog.Infof("Kubernetes client is nil, not starting status manager.") return } glog.Info("Starting to sync pod status with apiserver") syncTicker := time.Tick(syncPeriod) go wait.Forever(func() { select { case syncRequest := <-m.podStatusChannel: m.syncPod(syncRequest.podUID, syncRequest.status) case <-syncTicker: m.syncBatch() } }, 0) }
|
Start()会启动go routine,在go routine中主要完成两件事情:
- 消费podStatusChannel中的内容,调用syncPod()处理内容;
- 一定时间调用syncBatch()。
syncPod()
先来看syncPod()是如何同步pod状态的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { if !m.needsUpdate(uid, status) { glog.V(1).Infof("Status for pod %q is up-to-date; skipping", uid) return } pod, err := m.kubeClient.Core().Pods(status.podNamespace).Get(status.podName) if errors.IsNotFound(err) { glog.V(3).Infof("Pod %q (%s) does not exist on the server", status.podName, uid) return } if err == nil { translatedUID := m.podManager.TranslatePodUID(pod.UID) if len(translatedUID) > 0 && translatedUID != uid { glog.V(2).Infof("Pod %q was deleted and then recreated, skipping status update; old UID %q, new UID %q", format.Pod(pod), uid, translatedUID) m.deletePodStatus(uid) return } pod.Status = status.status pod, err = m.kubeClient.Core().Pods(pod.Namespace).UpdateStatus(pod) if err == nil { glog.V(3).Infof("Status for pod %q updated successfully: %+v", format.Pod(pod), status) m.apiStatusVersions[pod.UID] = status.version if kubepod.IsMirrorPod(pod) { return } if pod.DeletionTimestamp == nil { return } if !notRunning(pod.Status.ContainerStatuses) { glog.V(3).Infof("Pod %q is terminated, but some containers are still running", format.Pod(pod)) return } deleteOptions := api.NewDeleteOptions(0) deleteOptions.Preconditions = api.NewUIDPreconditions(string(pod.UID)) glog.V(2).Infof("Removing Pod %q from etcd", format.Pod(pod)) if err = m.kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, deleteOptions); err == nil { glog.V(3).Infof("Pod %q fully terminated and removed from etcd", format.Pod(pod)) m.deletePodStatus(uid) return } } } glog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err) }
|
syncPod()的主要流程如下:
- 依据传入的status通过client获取pod;
- 修改pod的status,并更新;
- 如果pod的DeletionTimestamp不为空,且pod中的容器没有处于运行中,则通过client删除pod。
步骤中的第3部是删除pod整个流程的最后一步,即从ETCD中删除pod。
syncBatch()
syncBach()会把manager中需要更新的pod的状态全部更新一遍。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| func (m *manager) syncBatch() { var updatedStatuses []podStatusSyncRequest podToMirror, mirrorToPod := m.podManager.GetUIDTranslations() func() { m.podStatusesLock.RLock() defer m.podStatusesLock.RUnlock() for uid := range m.apiStatusVersions { _, hasPod := m.podStatuses[uid] _, hasMirror := mirrorToPod[uid] if !hasPod && !hasMirror { delete(m.apiStatusVersions, uid) } } for uid, status := range m.podStatuses { syncedUID := uid if mirrorUID, ok := podToMirror[uid]; ok { if mirrorUID == "" { glog.V(5).Infof("Static pod %q (%s/%s) does not have a corresponding mirror pod; skipping", uid, status.podName, status.podNamespace) continue } syncedUID = mirrorUID } if m.needsUpdate(syncedUID, status) { updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status}) } else if m.needsReconcile(uid, status.status) { delete(m.apiStatusVersions, syncedUID) updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status}) } } }() for _, update := range updatedStatuses { m.syncPod(update.podUID, update.status) } }
|
syncBatch()把manager podStatuses中的pod全部检查一遍,检查的方法如下:
- needsUpdate(): 如果apiStatusVersions中的status号小于参数status号,或者apistatusVersions中不存在该status号,则需要更新;
- needsReconcile():如果manager中的status和podManager中的不一致,则需要更新,以manager中的为准。
如果需要更新,则调用syncPod()更新ETCD。
updateStatusInternal()
updateStatusInternal()是包内部更新status的入口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus, forceUpdate bool) bool { var oldStatus api.PodStatus cachedStatus, isCached := m.podStatuses[pod.UID] if isCached { oldStatus = cachedStatus.status } else if mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod); ok { oldStatus = mirrorPod.Status } else { oldStatus = pod.Status } if _, readyCondition := api.GetPodCondition(&status, api.PodReady); readyCondition != nil { lastTransitionTime := unversioned.Now() _, oldReadyCondition := api.GetPodCondition(&oldStatus, api.PodReady) if oldReadyCondition != nil && readyCondition.Status == oldReadyCondition.Status { lastTransitionTime = oldReadyCondition.LastTransitionTime } readyCondition.LastTransitionTime = lastTransitionTime } if _, initCondition := api.GetPodCondition(&status, api.PodInitialized); initCondition != nil { lastTransitionTime := unversioned.Now() _, oldInitCondition := api.GetPodCondition(&oldStatus, api.PodInitialized) if oldInitCondition != nil && initCondition.Status == oldInitCondition.Status { lastTransitionTime = oldInitCondition.LastTransitionTime } initCondition.LastTransitionTime = lastTransitionTime } if oldStatus.StartTime != nil && !oldStatus.StartTime.IsZero() { status.StartTime = oldStatus.StartTime } else if status.StartTime.IsZero() { now := unversioned.Now() status.StartTime = &now } normalizeStatus(pod, &status) if isCached && isStatusEqual(&cachedStatus.status, &status) && !forceUpdate { glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status) return false } newStatus := versionedPodStatus{ status: status, version: cachedStatus.version + 1, podName: pod.Name, podNamespace: pod.Namespace, } m.podStatuses[pod.UID] = newStatus select { case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}: return true default: glog.V(4).Infof("Skpping the status update for pod %q for now because the channel is full; status: %+v", format.Pod(pod), status) return false } }
|
updateStatusInternal()先更新readyCondition.LastTransitionTime, initCondition.LastTransitionTime, status.StartTime时间,然后封装成新的status(此时的version加1),把新的status又封装成podStatusSyncRequest放入podStatusChannel,然后syncPod()会自动去更新pod的status。
在statusManager中,封装有SetPodStatus(),SetContainerReadiness(), TerminatePod(),这些都调用了updateStatusInternal()。
SetPodStatus()实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) { m.podStatusesLock.Lock() defer m.podStatusesLock.Unlock() status, err := copyStatus(&status) if err != nil { return } m.updateStatusInternal(pod, status, pod.DeletionTimestamp != nil) }
|
SetPodStatus()直接把status更新到pod中。
SetContainerReadiness()实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) { m.podStatusesLock.Lock() defer m.podStatusesLock.Unlock() pod, ok := m.podManager.GetPodByUID(podUID) if !ok { glog.V(4).Infof("Pod %q has been deleted, no need to update readiness", string(podUID)) return } oldStatus, found := m.podStatuses[pod.UID] if !found { glog.Warningf("Container readiness changed before pod has synced: %q - %q", format.Pod(pod), containerID.String()) return } containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String()) if !ok { glog.Warningf("Container readiness changed for unknown container: %q - %q", format.Pod(pod), containerID.String()) return } if containerStatus.Ready == ready { glog.V(4).Infof("Container readiness unchanged (%v): %q - %q", ready, format.Pod(pod), containerID.String()) return } status, err := copyStatus(&oldStatus.status) if err != nil { return } containerStatus, _, _ = findContainerStatus(&status, containerID.String()) containerStatus.Ready = ready readyConditionIndex := -1 for i, condition := range status.Conditions { if condition.Type == api.PodReady { readyConditionIndex = i break } } readyCondition := GeneratePodReadyCondition(&pod.Spec, status.ContainerStatuses, status.Phase) if readyConditionIndex != -1 { status.Conditions[readyConditionIndex] = readyCondition } else { glog.Warningf("PodStatus missing PodReady condition: %+v", status) status.Conditions = append(status.Conditions, readyCondition) } m.updateStatusInternal(pod, status, false) }
|
SetContainerReadiness()设置pod中某container的状态为ready,并更新pod的condition。
更新pod的condition由GeneratePodReadyCondition()生成,定义在/pkg/kubelet/status/generate.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| func GeneratePodReadyCondition(spec *api.PodSpec, containerStatuses []api.ContainerStatus, podPhase api.PodPhase) api.PodCondition { if containerStatuses == nil { return api.PodCondition{ Type: api.PodReady, Status: api.ConditionFalse, Reason: "UnknownContainerStatuses", } } unknownContainers := []string{} unreadyContainers := []string{} for _, container := range spec.Containers { if containerStatus, ok := api.GetContainerStatus(containerStatuses, container.Name); ok { if !containerStatus.Ready { unreadyContainers = append(unreadyContainers, container.Name) } } else { unknownContainers = append(unknownContainers, container.Name) } } if podPhase == api.PodSucceeded && len(unknownContainers) == 0 { return api.PodCondition{ Type: api.PodReady, Status: api.ConditionFalse, Reason: "PodCompleted", } } unreadyMessages := []string{} if len(unknownContainers) > 0 { unreadyMessages = append(unreadyMessages, fmt.Sprintf("containers with unknown status: %s", unknownContainers)) } if len(unreadyContainers) > 0 { unreadyMessages = append(unreadyMessages, fmt.Sprintf("containers with unready status: %s", unreadyContainers)) } unreadyMessage := strings.Join(unreadyMessages, ", ") if unreadyMessage != "" { return api.PodCondition{ Type: api.PodReady, Status: api.ConditionFalse, Reason: "ContainersNotReady", Message: unreadyMessage, } } return api.PodCondition{ Type: api.PodReady, Status: api.ConditionTrue, } }
|
GeneratePodReadyCondition()会查看pod中所有container的状态,如果container的状态全部为Ready,则pod的Ready也为true;否则pod的状态为false。
TerminatePod()实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| func (m *manager) TerminatePod(pod *api.Pod) { m.podStatusesLock.Lock() defer m.podStatusesLock.Unlock() oldStatus := &pod.Status if cachedStatus, ok := m.podStatuses[pod.UID]; ok { oldStatus = &cachedStatus.status } status, err := copyStatus(oldStatus) if err != nil { return } for i := range status.ContainerStatuses { status.ContainerStatuses[i].State = api.ContainerState{ Terminated: &api.ContainerStateTerminated{}, } } for i := range status.InitContainerStatuses { status.InitContainerStatuses[i].State = api.ContainerState{ Terminated: &api.ContainerStateTerminated{}, } } m.updateStatusInternal(pod, pod.Status, true) }
|
TerminatePod()会把status.ContainerStatuses和status.InitContainerStatuses中的container全部置为Terminated状态。