podManager
podManager在kubelet中负责内存中pod及mirrorPod的维护,任何在本节点上创建或删除操作都会同步更新podManager,即可以认为podManager中存储了本节点上运行的pod的信息。这里要对mirrorPod做下说明,在kubernetes中,如果是static pod,则由kubelet直接创建,这时系统很难管理这部分pod;所以系统会在kubelet中创建一个static pod对应的mirrorPod,来表示static pod。
podManager定义在/pkg/kubelet/pod/pod_manager.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| type basicManager struct { lock sync.RWMutex podByUID map[types.UID]*api.Pod mirrorPodByUID map[types.UID]*api.Pod podByFullName map[string]*api.Pod mirrorPodByFullName map[string]*api.Pod translationByUID map[types.UID]types.UID MirrorClient }
|
basicManager即所说的podManager,主要字段含义如下:
- podByUID: 记录uid和pod的map关系;
- mirrorPodByUID: 记录mirrorPod的uid和mirrorPod的map关系;
- podByFullName: 记录fullName和pod的map关系;
- mirrorPodByFullName: 记录fullName和mirrorPod的map关系;
- translationByUID: 记录mirrorPod的uid和pod的uid的map关系;
- MirrorClient: 可以管理mirrorPod。
basicManager可以通过NewBasicPodManager()生成:
1 2 3 4 5 6 7
| func NewBasicPodManager(client MirrorClient) Manager { pm := &basicManager{} pm.MirrorClient = client pm.SetPods(nil) return pm }
|
basicManager::SetPods()
SetPods()可以把basicManager中的pods信息更换成newPods的信息。
1 2 3 4 5 6 7 8 9 10 11 12 13
| func (pm *basicManager) SetPods(newPods []*api.Pod) { pm.lock.Lock() defer pm.lock.Unlock() pm.podByUID = make(map[types.UID]*api.Pod) pm.podByFullName = make(map[string]*api.Pod) pm.mirrorPodByUID = make(map[types.UID]*api.Pod) pm.mirrorPodByFullName = make(map[string]*api.Pod) pm.translationByUID = make(map[types.UID]types.UID) pm.updatePodsInternal(newPods...) }
|
basicManager::AddPod()
AddPod()可以往basicManager中添加pod。
1 2 3 4
| func (pm *basicManager) AddPod(pod *api.Pod) { pm.UpdatePod(pod) }
|
basicManager::UpdatePod()
UpdatePod()可以向basicManager中更新pod。
1 2 3 4 5 6
| func (pm *basicManager) UpdatePod(pod *api.Pod) { pm.lock.Lock() defer pm.lock.Unlock() pm.updatePodsInternal(pod) }
|
basicManager::updatePodsInternal()
updatePodsInternal()更新pods。如果pod的类型为mirrorPod,则更新mirroPodByUID, mirrorPodByFullName和translationByUID;如果是pod,则更新podByUID和podByFullName。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| func (pm *basicManager) updatePodsInternal(pods ...*api.Pod) { for _, pod := range pods { podFullName := kubecontainer.GetPodFullName(pod) if IsMirrorPod(pod) { pm.mirrorPodByUID[pod.UID] = pod pm.mirrorPodByFullName[podFullName] = pod if p, ok := pm.podByFullName[podFullName]; ok { pm.translationByUID[pod.UID] = p.UID } } else { pm.podByUID[pod.UID] = pod pm.podByFullName[podFullName] = pod if mirror, ok := pm.mirrorPodByFullName[podFullName]; ok { pm.translationByUID[mirror.UID] = pod.UID } } } }
|
basicManager::DeletePod()
DeletePod()从basicManager中删除pod。如果pod是mirrorPod,则清理mirrorPodByUID, mirrorPodByFullName和translationByUID中pod相关内容;如果是pod,则只清理podByUID和podByFullName中pod相关内容。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func (pm *basicManager) DeletePod(pod *api.Pod) { pm.lock.Lock() defer pm.lock.Unlock() podFullName := kubecontainer.GetPodFullName(pod) if IsMirrorPod(pod) { delete(pm.mirrorPodByUID, pod.UID) delete(pm.mirrorPodByFullName, podFullName) delete(pm.translationByUID, pod.UID) } else { delete(pm.podByUID, pod.UID) delete(pm.podByFullName, podFullName) } }
|
basicManager::GetPods()
GetPods()可以获取podByUID中的所有pod。
1 2 3 4 5 6
| func (pm *basicManager) GetPods() []*api.Pod { pm.lock.RLock() defer pm.lock.RUnlock() return podsMapToPods(pm.podByUID) }
|
basicManager::GetPodsAndMirrorPods()
GetPodsAndMirrorPods()可以获取pods和mirrorPods。
1 2 3 4 5 6 7 8
| func (pm *basicManager) GetPodsAndMirrorPods() ([]*api.Pod, []*api.Pod) { pm.lock.RLock() defer pm.lock.RUnlock() pods := podsMapToPods(pm.podByUID) mirrorPods := podsMapToPods(pm.mirrorPodByUID) return pods, mirrorPods }
|
basicManager::GetPodByUID()
GetPodByUID()可以获取指定uid的pod。
1 2 3 4 5 6 7
| func (pm *basicManager) GetPodByUID(uid types.UID) (*api.Pod, bool) { pm.lock.RLock() defer pm.lock.RUnlock() pod, ok := pm.podByUID[uid] return pod, ok }
|
basicManager::GetPodByName()
GetPodByName()可以通过namespace和name获取指定pod。
1 2 3 4 5
| func (pm *basicManager) GetPodByName(namespace, name string) (*api.Pod, bool) { podFullName := kubecontainer.BuildPodFullName(name, namespace) return pm.GetPodByFullName(podFullName) }
|
basicManager::GetPodByFullName()
GetPodByFullName()可以通过podFullName获取指定pod。
1 2 3 4 5 6 7
| func (pm *basicManager) GetPodByFullName(podFullName string) (*api.Pod, bool) { pm.lock.RLock() defer pm.lock.RUnlock() pod, ok := pm.podByFullName[podFullName] return pod, ok }
|
basicManager::TranslatePodUID()
TranslatePodUID()可以通过mirrorPod的uid获取对应pod的uid。
1 2 3 4 5 6 7 8 9 10 11 12 13
| func (pm *basicManager) TranslatePodUID(uid types.UID) types.UID { if uid == "" { return uid } pm.lock.RLock() defer pm.lock.RUnlock() if translated, ok := pm.translationByUID[uid]; ok { return translated } return uid }
|
basicManager::GetUIDTranslations()
GetUIDTranslations()可以获取pod到mirrorPod及mirrorPod到pod的映射关系。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| func (pm *basicManager) GetUIDTranslations() (podToMirror, mirrorToPod map[types.UID]types.UID) { pm.lock.RLock() defer pm.lock.RUnlock() podToMirror = make(map[types.UID]types.UID, len(pm.translationByUID)) mirrorToPod = make(map[types.UID]types.UID, len(pm.translationByUID)) for uid, pod := range pm.podByUID { if !IsStaticPod(pod) { continue } podToMirror[uid] = "" } for k, v := range pm.translationByUID { mirrorToPod[k] = v podToMirror[v] = k } return podToMirror, mirrorToPod }
|
basicManager::DeleteOrphanedMirrorPods()
如果在mirrorPodByFullName中存在,但在podByFullName中不存在,则该mirrorPod为orphanedMirrorPod,DeleteOrphanedMirrorPods()会清除这些orphanedMirrorPod。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| func (pm *basicManager) DeleteOrphanedMirrorPods() { podFullNames := pm.getOrphanedMirrorPodNames() for _, podFullName := range podFullNames { pm.MirrorClient.DeleteMirrorPod(podFullName) } } func (pm *basicManager) getOrphanedMirrorPodNames() []string { pm.lock.RLock() defer pm.lock.RUnlock() var podFullNames []string for podFullName := range pm.mirrorPodByFullName { if _, ok := pm.podByFullName[podFullName]; !ok { podFullNames = append(podFullNames, podFullName) } } return podFullNames }
|
basicManager::IsMirrorPodOf()
IsMirrorPodOf()可以检查mirrorPod和pod是否匹配。
1 2 3 4 5 6 7 8 9 10 11 12
| func (pm *basicManager) IsMirrorPodOf(mirrorPod, pod *api.Pod) bool { if pod.Name != mirrorPod.Name || pod.Namespace != mirrorPod.Namespace { return false } hash, ok := getHashFromMirrorPod(mirrorPod) if !ok { return false } return hash == getPodHash(pod) }
|
basicMirrorClient()
再来看下mirrorClient,由basicMirrorClient实现,定义在/pkg/kubelet/pod/mirror_Client.go中:
1 2 3 4 5 6
| type basicMirrorClient struct { apiserverClient clientset.Interface }
|
可以通过basicMirrorClient操作mirrorPod。由于podManager中嵌入了basicMirrorClient,所以podManager也有操作mirrorPod的能力。
basicMirrorClient可以使用NewBasicMirrorClient()方法生成:
1 2 3 4
| func NewBasicMirrorClient(apiserverClient clientset.Interface) MirrorClient { return &basicMirrorClient{apiserverClient: apiserverClient} }
|
basicMirrorClient::CreateMirrorPod()
CreateMirrorPod()可以创建pod对应的mirrorPod。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| func (mc *basicMirrorClient) CreateMirrorPod(pod *api.Pod) error { if mc.apiserverClient == nil { return nil } copyPod := *pod copyPod.Annotations = make(map[string]string) for k, v := range pod.Annotations { copyPod.Annotations[k] = v } hash := getPodHash(pod) copyPod.Annotations[kubetypes.ConfigMirrorAnnotationKey] = hash apiPod, err := mc.apiserverClient.Core().Pods(copyPod.Namespace).Create(©Pod) if err != nil && errors.IsAlreadyExists(err) { if h, ok := apiPod.Annotations[kubetypes.ConfigMirrorAnnotationKey]; ok && h == hash { return nil } } return err }
|
basicMirrorClient::DeleteMirrorPod()
DeleteMirrorPod()可以删除mirrorPod。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| func (mc *basicMirrorClient) DeleteMirrorPod(podFullName string) error { if mc.apiserverClient == nil { return nil } name, namespace, err := kubecontainer.ParsePodFullName(podFullName) if err != nil { glog.Errorf("Failed to parse a pod full name %q", podFullName) return err } glog.V(2).Infof("Deleting a mirror pod %q", podFullName) if err := mc.apiserverClient.Core().Pods(namespace).Delete(name, api.NewDeleteOptions(0)); err != nil && !errors.IsNotFound(err) { glog.Errorf("Failed deleting a mirror pod %q: %v", podFullName, err) } return nil }
|
IsStaticPod()
IsStaticPod()可以依据pod中annotations中标记的来源来判断是否为staticPod。
1 2 3 4 5 6
| func IsStaticPod(pod *api.Pod) bool { source, err := kubetypes.GetPodSource(pod) return err == nil && source != kubetypes.ApiserverSource }
|
系统中定义了4种来源:
1 2 3 4 5 6 7 8
| FileSource = "file" HTTPSource = "http" ApiserverSource = "api" AllSource = "*"
|
IsMirrorPod()
IsMirrorPod()通过annotations中的”kubernetes.io/config.mirror” key来判断该pod是否为mirrorPod。
1 2 3 4 5
| func IsMirrorPod(pod *api.Pod) bool { _, ok := pod.Annotations[kubetypes.ConfigMirrorAnnotationKey] return ok }
|
getHashFromMirrorPod()
getHashFromMirrorPod()获取mirrorPod annotations中的kubetypes.ConfigMirrorAnnotationKey,即”kubernetes.io/config.hash”的值。
1 2 3 4
| func getHashFromMirrorPod(pod *api.Pod) (string, bool) { hash, ok := pod.Annotations[kubetypes.ConfigMirrorAnnotationKey] return hash, ok }
|
kubetypes.ConfigMirrorAnnotationKey的设置定义在/pkg/kubelet/config/common.go中的applyDefaults()中。
getPodHash()
getPodHash()获取pod annotations中的kubetypes.ConfigMirrorAnnotationKey,即”kubernetes.io/config.hash”的值。
1 2 3 4
| func getPodHash(pod *api.Pod) string { return pod.Annotations[kubetypes.ConfigHashAnnotationKey] }
|