podStatus

一个pod的status如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
status:
conditions:
- lastProbeTime: null
lastTransitionTime: 2017-12-07T01:25:11Z
status: "True"
type: Initialized
- lastProbeTime: null
lastTransitionTime: 2017-12-17T09:13:41Z
status: "True"
type: Ready
- lastProbeTime: null
lastTransitionTime: 2017-12-07T01:25:11Z
status: "True"
type: PodScheduled
containerStatuses:
- containerID: docker://397a1f998a38c6549c73df6220fa672eb45e1b8fa317e07a1ca428ed8b108c34
image: ubuntu-ssh:v1
imageID: docker://sha256:27312dad4272a61739c317b90bfab5bc129e38696df5c606b1041199caf03ea4
lastState: {}
name: ubuntu
ready: true
restartCount: 0
state:
running:
startedAt: 2017-12-17T09:13:40Z

pod的status可以表示pod的状态及pod下的容器的状态。

statusManager

statusManager主要用来管理pod的status,定义在/pkg/kubelet/status/status_manager.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Updates pod statuses in apiserver. Writes only when new status has changed.
// All methods are thread-safe.
type manager struct {
kubeClient clientset.Interface
podManager kubepod.Manager
// Map from pod UID to sync status of the corresponding pod.
podStatuses map[types.UID]versionedPodStatus
podStatusesLock sync.RWMutex
//***Fankang***//
//***pod status channel***//
podStatusChannel chan podStatusSyncRequest
// Map from (mirror) pod UID to latest status version successfully sent to the API server.
// apiStatusVersions must only be accessed from the sync thread.
apiStatusVersions map[types.UID]uint64
}

manager的主要字段含义如下:

  • kubeClient: k8s clientset,用于和apiserver交互;
  • podManager: 负责内存中pod的维护;
  • podStatuses: 存储pod及状态的map关系;
  • podStatusesChannel: statusManager对外暴露的channel;
  • apiStatusVersions: 维护更新完的pod的status更新号,每更新一次会加1。

manager(即statusManager)可以使用Start()方法启动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (m *manager) Start() {
// Don't start the status manager if we don't have a client. This will happen
// on the master, where the kubelet is responsible for bootstrapping the pods
// of the master components.
if m.kubeClient == nil {
glog.Infof("Kubernetes client is nil, not starting status manager.")
return
}
glog.Info("Starting to sync pod status with apiserver")
//***时间触发器的实现***//
syncTicker := time.Tick(syncPeriod)
// syncPod and syncBatch share the same go routine to avoid sync races.
go wait.Forever(func() {
select {
//***消费podStatusChannel中的数据***//
case syncRequest := <-m.podStatusChannel:
//***进行同步***//
m.syncPod(syncRequest.podUID, syncRequest.status)
case <-syncTicker:
m.syncBatch()
}
}, 0)
}

Start()会启动go routine,在go routine中主要完成两件事情:

  1. 消费podStatusChannel中的内容,调用syncPod()处理内容;
  2. 一定时间调用syncBatch()。

syncPod()

先来看syncPod()是如何同步pod状态的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// syncPod syncs the given status with the API server. The caller must not hold the lock.
func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
if !m.needsUpdate(uid, status) {
glog.V(1).Infof("Status for pod %q is up-to-date; skipping", uid)
return
}
// TODO: make me easier to express from client code
//***获取Pod***//
pod, err := m.kubeClient.Core().Pods(status.podNamespace).Get(status.podName)
if errors.IsNotFound(err) {
glog.V(3).Infof("Pod %q (%s) does not exist on the server", status.podName, uid)
// If the Pod is deleted the status will be cleared in
// RemoveOrphanedStatuses, so we just ignore the update here.
return
}
if err == nil {
translatedUID := m.podManager.TranslatePodUID(pod.UID)
if len(translatedUID) > 0 && translatedUID != uid {
glog.V(2).Infof("Pod %q was deleted and then recreated, skipping status update; old UID %q, new UID %q", format.Pod(pod), uid, translatedUID)
m.deletePodStatus(uid)
return
}
//***把pod的status更新为新的status***//
pod.Status = status.status
// TODO: handle conflict as a retry, make that easier too.
//***更新pod***//
pod, err = m.kubeClient.Core().Pods(pod.Namespace).UpdateStatus(pod)
if err == nil {
glog.V(3).Infof("Status for pod %q updated successfully: %+v", format.Pod(pod), status)
m.apiStatusVersions[pod.UID] = status.version
if kubepod.IsMirrorPod(pod) {
// We don't handle graceful deletion of mirror pods.
return
}
//***此处有DeletionTimestamp***//
//***如果pod.DeletionTimestamp被设置,则会接着往下走到删除容器流程***//
if pod.DeletionTimestamp == nil {
return
}
if !notRunning(pod.Status.ContainerStatuses) {
glog.V(3).Infof("Pod %q is terminated, but some containers are still running", format.Pod(pod))
return
}
deleteOptions := api.NewDeleteOptions(0)
// Use the pod UID as the precondition for deletion to prevent deleting a newly created pod with the same name and namespace.
deleteOptions.Preconditions = api.NewUIDPreconditions(string(pod.UID))
glog.V(2).Infof("Removing Pod %q from etcd", format.Pod(pod))
//***删除容器***//
if err = m.kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, deleteOptions); err == nil {
glog.V(3).Infof("Pod %q fully terminated and removed from etcd", format.Pod(pod))
m.deletePodStatus(uid)
return
}
}
}
// We failed to update status, wait for periodic sync to retry.
glog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err)
}

syncPod()的主要流程如下:

  1. 依据传入的status通过client获取pod;
  2. 修改pod的status,并更新;
  3. 如果pod的DeletionTimestamp不为空,且pod中的容器没有处于运行中,则通过client删除pod。

步骤中的第3部是删除pod整个流程的最后一步,即从ETCD中删除pod。

syncBatch()

syncBach()会把manager中需要更新的pod的状态全部更新一遍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//***感觉syncBatach()是把所有的需要更新的status全部在apiserver中更新一遍***//
func (m *manager) syncBatch() {
var updatedStatuses []podStatusSyncRequest
podToMirror, mirrorToPod := m.podManager.GetUIDTranslations()
func() { // Critical section
m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock()
// Clean up orphaned versions.
for uid := range m.apiStatusVersions {
_, hasPod := m.podStatuses[uid]
_, hasMirror := mirrorToPod[uid]
if !hasPod && !hasMirror {
delete(m.apiStatusVersions, uid)
}
}
for uid, status := range m.podStatuses {
syncedUID := uid
if mirrorUID, ok := podToMirror[uid]; ok {
if mirrorUID == "" {
glog.V(5).Infof("Static pod %q (%s/%s) does not have a corresponding mirror pod; skipping", uid, status.podName, status.podNamespace)
continue
}
syncedUID = mirrorUID
}
if m.needsUpdate(syncedUID, status) {
updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
} else if m.needsReconcile(uid, status.status) {
// Delete the apiStatusVersions here to force an update on the pod status
// In most cases the deleted apiStatusVersions here should be filled
// soon after the following syncPod() [If the syncPod() sync an update
// successfully].
delete(m.apiStatusVersions, syncedUID)
updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
}
}
}()
//***更新pod status***//
for _, update := range updatedStatuses {
m.syncPod(update.podUID, update.status)
}
}

syncBatch()把manager podStatuses中的pod全部检查一遍,检查的方法如下:

  • needsUpdate(): 如果apiStatusVersions中的status号小于参数status号,或者apistatusVersions中不存在该status号,则需要更新;
  • needsReconcile():如果manager中的status和podManager中的不一致,则需要更新,以manager中的为准。

如果需要更新,则调用syncPod()更新ETCD。

updateStatusInternal()

updateStatusInternal()是包内部更新status的入口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus, forceUpdate bool) bool {
var oldStatus api.PodStatus
cachedStatus, isCached := m.podStatuses[pod.UID]
if isCached {
oldStatus = cachedStatus.status
} else if mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod); ok {
oldStatus = mirrorPod.Status
} else {
oldStatus = pod.Status
}
// Set ReadyCondition.LastTransitionTime.
if _, readyCondition := api.GetPodCondition(&status, api.PodReady); readyCondition != nil {
// Need to set LastTransitionTime.
lastTransitionTime := unversioned.Now()
_, oldReadyCondition := api.GetPodCondition(&oldStatus, api.PodReady)
if oldReadyCondition != nil && readyCondition.Status == oldReadyCondition.Status {
lastTransitionTime = oldReadyCondition.LastTransitionTime
}
readyCondition.LastTransitionTime = lastTransitionTime
}
// Set InitializedCondition.LastTransitionTime.
if _, initCondition := api.GetPodCondition(&status, api.PodInitialized); initCondition != nil {
// Need to set LastTransitionTime.
lastTransitionTime := unversioned.Now()
_, oldInitCondition := api.GetPodCondition(&oldStatus, api.PodInitialized)
if oldInitCondition != nil && initCondition.Status == oldInitCondition.Status {
lastTransitionTime = oldInitCondition.LastTransitionTime
}
initCondition.LastTransitionTime = lastTransitionTime
}
// ensure that the start time does not change across updates.
if oldStatus.StartTime != nil && !oldStatus.StartTime.IsZero() {
status.StartTime = oldStatus.StartTime
} else if status.StartTime.IsZero() {
// if the status has no start time, we need to set an initial time
//***2017-06-09 13:19:23.929906722 +0800 CST***//
//***所以在此处,用的是计算机系统时区***//
now := unversioned.Now()
status.StartTime = &now
}
normalizeStatus(pod, &status)
// The intent here is to prevent concurrent updates to a pod's status from
// clobbering each other so the phase of a pod progresses monotonically.
if isCached && isStatusEqual(&cachedStatus.status, &status) && !forceUpdate {
glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status)
return false // No new status.
}
newStatus := versionedPodStatus{
status: status,
version: cachedStatus.version + 1,
podName: pod.Name,
podNamespace: pod.Namespace,
}
m.podStatuses[pod.UID] = newStatus
select {
//***把更新放到podStatusChannel中以供更新***//
case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}:
return true
default:
// Let the periodic syncBatch handle the update if the channel is full.
// We can't block, since we hold the mutex lock.
glog.V(4).Infof("Skpping the status update for pod %q for now because the channel is full; status: %+v",
format.Pod(pod), status)
return false
}
}

updateStatusInternal()先更新readyCondition.LastTransitionTime, initCondition.LastTransitionTime, status.StartTime时间,然后封装成新的status(此时的version加1),把新的status又封装成podStatusSyncRequest放入podStatusChannel,然后syncPod()会自动去更新pod的status。

在statusManager中,封装有SetPodStatus(),SetContainerReadiness(), TerminatePod(),这些都调用了updateStatusInternal()。

SetPodStatus()实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//***设置Pod Status***//
func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
// Make sure we're caching a deep copy.
status, err := copyStatus(&status)
if err != nil {
return
}
// Force a status update if deletion timestamp is set. This is necessary
// because if the pod is in the non-running state, the pod worker still
// needs to be able to trigger an update and/or deletion.
m.updateStatusInternal(pod, status, pod.DeletionTimestamp != nil)
}

SetPodStatus()直接把status更新到pod中。

SetContainerReadiness()实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
//***设置pod中某container的状态为ready***//
func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
pod, ok := m.podManager.GetPodByUID(podUID)
if !ok {
glog.V(4).Infof("Pod %q has been deleted, no need to update readiness", string(podUID))
return
}
oldStatus, found := m.podStatuses[pod.UID]
if !found {
glog.Warningf("Container readiness changed before pod has synced: %q - %q",
format.Pod(pod), containerID.String())
return
}
// Find the container to update.
containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String())
if !ok {
glog.Warningf("Container readiness changed for unknown container: %q - %q",
format.Pod(pod), containerID.String())
return
}
if containerStatus.Ready == ready {
glog.V(4).Infof("Container readiness unchanged (%v): %q - %q", ready,
format.Pod(pod), containerID.String())
return
}
// Make sure we're not updating the cached version.
status, err := copyStatus(&oldStatus.status)
if err != nil {
return
}
containerStatus, _, _ = findContainerStatus(&status, containerID.String())
containerStatus.Ready = ready
// Update pod condition.
readyConditionIndex := -1
for i, condition := range status.Conditions {
if condition.Type == api.PodReady {
readyConditionIndex = i
break
}
}
readyCondition := GeneratePodReadyCondition(&pod.Spec, status.ContainerStatuses, status.Phase)
if readyConditionIndex != -1 {
status.Conditions[readyConditionIndex] = readyCondition
} else {
glog.Warningf("PodStatus missing PodReady condition: %+v", status)
status.Conditions = append(status.Conditions, readyCondition)
}
m.updateStatusInternal(pod, status, false)
}

SetContainerReadiness()设置pod中某container的状态为ready,并更新pod的condition。
更新pod的condition由GeneratePodReadyCondition()生成,定义在/pkg/kubelet/status/generate.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// GeneratePodReadyCondition returns ready condition if all containers in a pod are ready, else it
// returns an unready condition.
func GeneratePodReadyCondition(spec *api.PodSpec, containerStatuses []api.ContainerStatus, podPhase api.PodPhase) api.PodCondition {
// Find if all containers are ready or not.
if containerStatuses == nil {
return api.PodCondition{
Type: api.PodReady,
Status: api.ConditionFalse,
Reason: "UnknownContainerStatuses",
}
}
unknownContainers := []string{}
unreadyContainers := []string{}
for _, container := range spec.Containers {
if containerStatus, ok := api.GetContainerStatus(containerStatuses, container.Name); ok {
if !containerStatus.Ready {
unreadyContainers = append(unreadyContainers, container.Name)
}
} else {
unknownContainers = append(unknownContainers, container.Name)
}
}
// If all containers are known and succeeded, just return PodCompleted.
if podPhase == api.PodSucceeded && len(unknownContainers) == 0 {
return api.PodCondition{
Type: api.PodReady,
Status: api.ConditionFalse,
Reason: "PodCompleted",
}
}
unreadyMessages := []string{}
if len(unknownContainers) > 0 {
unreadyMessages = append(unreadyMessages, fmt.Sprintf("containers with unknown status: %s", unknownContainers))
}
if len(unreadyContainers) > 0 {
unreadyMessages = append(unreadyMessages, fmt.Sprintf("containers with unready status: %s", unreadyContainers))
}
unreadyMessage := strings.Join(unreadyMessages, ", ")
if unreadyMessage != "" {
return api.PodCondition{
Type: api.PodReady,
Status: api.ConditionFalse,
Reason: "ContainersNotReady",
Message: unreadyMessage,
}
}
return api.PodCondition{
Type: api.PodReady,
Status: api.ConditionTrue,
}
}

GeneratePodReadyCondition()会查看pod中所有container的状态,如果container的状态全部为Ready,则pod的Ready也为true;否则pod的状态为false。

TerminatePod()实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (m *manager) TerminatePod(pod *api.Pod) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
oldStatus := &pod.Status
if cachedStatus, ok := m.podStatuses[pod.UID]; ok {
oldStatus = &cachedStatus.status
}
status, err := copyStatus(oldStatus)
if err != nil {
return
}
//***置status.ContainerStatuses中所有容器状态为Treminated***//
for i := range status.ContainerStatuses {
status.ContainerStatuses[i].State = api.ContainerState{
Terminated: &api.ContainerStateTerminated{},
}
}
//***置status.InitContainerStatuses中所有容器状态为Treminated***//
for i := range status.InitContainerStatuses {
status.InitContainerStatuses[i].State = api.ContainerState{
Terminated: &api.ContainerStateTerminated{},
}
}
m.updateStatusInternal(pod, pod.Status, true)
}

TerminatePod()会把status.ContainerStatuses和status.InitContainerStatuses中的container全部置为Terminated状态。