podManager

podManager在kubelet中负责内存中pod及mirrorPod的维护,任何在本节点上创建或删除操作都会同步更新podManager,即可以认为podManager中存储了本节点上运行的pod的信息。这里要对mirrorPod做下说明,在kubernetes中,如果是static pod,则由kubelet直接创建,这时系统很难管理这部分pod;所以系统会在kubelet中创建一个static pod对应的mirrorPod,来表示static pod。
podManager定义在/pkg/kubelet/pod/pod_manager.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type basicManager struct {
// Protects all internal maps.
lock sync.RWMutex
//***UID与pod的映射***//
// Regular pods indexed by UID.
podByUID map[types.UID]*api.Pod
// Mirror pods indexed by UID.
mirrorPodByUID map[types.UID]*api.Pod
//***FullName与pod的映射***//
// Pods indexed by full name for easy access.
podByFullName map[string]*api.Pod
mirrorPodByFullName map[string]*api.Pod
// Mirror pod UID to pod UID map.
translationByUID map[types.UID]types.UID
// A mirror pod client to create/delete mirror pods.
MirrorClient
}

basicManager即所说的podManager,主要字段含义如下:

  1. podByUID: 记录uid和pod的map关系;
  2. mirrorPodByUID: 记录mirrorPod的uid和mirrorPod的map关系;
  3. podByFullName: 记录fullName和pod的map关系;
  4. mirrorPodByFullName: 记录fullName和mirrorPod的map关系;
  5. translationByUID: 记录mirrorPod的uid和pod的uid的map关系;
  6. MirrorClient: 可以管理mirrorPod。

basicManager可以通过NewBasicPodManager()生成:

1
2
3
4
5
6
7
//***生成新的basicManager***//
func NewBasicPodManager(client MirrorClient) Manager {
pm := &basicManager{}
pm.MirrorClient = client
pm.SetPods(nil)
return pm
}

basicManager::SetPods()

SetPods()可以把basicManager中的pods信息更换成newPods的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
//***依据newPods初始化basicManager***//
func (pm *basicManager) SetPods(newPods []*api.Pod) {
pm.lock.Lock()
defer pm.lock.Unlock()
pm.podByUID = make(map[types.UID]*api.Pod)
pm.podByFullName = make(map[string]*api.Pod)
pm.mirrorPodByUID = make(map[types.UID]*api.Pod)
pm.mirrorPodByFullName = make(map[string]*api.Pod)
pm.translationByUID = make(map[types.UID]types.UID)
pm.updatePodsInternal(newPods...)
}

basicManager::AddPod()

AddPod()可以往basicManager中添加pod。

1
2
3
4
//***在basicManager中添加pod***//
func (pm *basicManager) AddPod(pod *api.Pod) {
pm.UpdatePod(pod)
}

basicManager::UpdatePod()

UpdatePod()可以向basicManager中更新pod。

1
2
3
4
5
6
//***在basicManager中更新pod***//
func (pm *basicManager) UpdatePod(pod *api.Pod) {
pm.lock.Lock()
defer pm.lock.Unlock()
pm.updatePodsInternal(pod)
}

basicManager::updatePodsInternal()

updatePodsInternal()更新pods。如果pod的类型为mirrorPod,则更新mirroPodByUID, mirrorPodByFullName和translationByUID;如果是pod,则更新podByUID和podByFullName。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//***具体更新的实现***//
func (pm *basicManager) updatePodsInternal(pods ...*api.Pod) {
for _, pod := range pods {
podFullName := kubecontainer.GetPodFullName(pod)
if IsMirrorPod(pod) {
//***更新mirrorPodByUID及mirrorPodByFullName***//
//***设置mirrorPod和pod的映射关系***//
pm.mirrorPodByUID[pod.UID] = pod
pm.mirrorPodByFullName[podFullName] = pod
if p, ok := pm.podByFullName[podFullName]; ok {
pm.translationByUID[pod.UID] = p.UID
}
} else {
//***更新podByUID及podByFullName***//
pm.podByUID[pod.UID] = pod
pm.podByFullName[podFullName] = pod
if mirror, ok := pm.mirrorPodByFullName[podFullName]; ok {
pm.translationByUID[mirror.UID] = pod.UID
}
}
}
}

basicManager::DeletePod()

DeletePod()从basicManager中删除pod。如果pod是mirrorPod,则清理mirrorPodByUID, mirrorPodByFullName和translationByUID中pod相关内容;如果是pod,则只清理podByUID和podByFullName中pod相关内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//***从basicManager中删除pod***//
func (pm *basicManager) DeletePod(pod *api.Pod) {
pm.lock.Lock()
defer pm.lock.Unlock()
podFullName := kubecontainer.GetPodFullName(pod)
if IsMirrorPod(pod) {
delete(pm.mirrorPodByUID, pod.UID)
delete(pm.mirrorPodByFullName, podFullName)
delete(pm.translationByUID, pod.UID)
} else {
delete(pm.podByUID, pod.UID)
delete(pm.podByFullName, podFullName)
}
}

basicManager::GetPods()

GetPods()可以获取podByUID中的所有pod。

1
2
3
4
5
6
//***获取basicManager中的pod列表***//
func (pm *basicManager) GetPods() []*api.Pod {
pm.lock.RLock()
defer pm.lock.RUnlock()
return podsMapToPods(pm.podByUID)
}

basicManager::GetPodsAndMirrorPods()

GetPodsAndMirrorPods()可以获取pods和mirrorPods。

1
2
3
4
5
6
7
8
//***获取basicManager中的pod列表及mirrorPod列表***//
func (pm *basicManager) GetPodsAndMirrorPods() ([]*api.Pod, []*api.Pod) {
pm.lock.RLock()
defer pm.lock.RUnlock()
pods := podsMapToPods(pm.podByUID)
mirrorPods := podsMapToPods(pm.mirrorPodByUID)
return pods, mirrorPods
}

basicManager::GetPodByUID()

GetPodByUID()可以获取指定uid的pod。

1
2
3
4
5
6
7
//***通过uid获取pod***//
func (pm *basicManager) GetPodByUID(uid types.UID) (*api.Pod, bool) {
pm.lock.RLock()
defer pm.lock.RUnlock()
pod, ok := pm.podByUID[uid]
return pod, ok
}

basicManager::GetPodByName()

GetPodByName()可以通过namespace和name获取指定pod。

1
2
3
4
5
//***通过namespace和name获取Pod***//
func (pm *basicManager) GetPodByName(namespace, name string) (*api.Pod, bool) {
podFullName := kubecontainer.BuildPodFullName(name, namespace)
return pm.GetPodByFullName(podFullName)
}

basicManager::GetPodByFullName()

GetPodByFullName()可以通过podFullName获取指定pod。

1
2
3
4
5
6
7
//***通过podFullName获取pod***//
func (pm *basicManager) GetPodByFullName(podFullName string) (*api.Pod, bool) {
pm.lock.RLock()
defer pm.lock.RUnlock()
pod, ok := pm.podByFullName[podFullName]
return pod, ok
}

basicManager::TranslatePodUID()

TranslatePodUID()可以通过mirrorPod的uid获取对应pod的uid。

1
2
3
4
5
6
7
8
9
10
11
12
13
//***通过mirrorPod的uid获取pod的uid***//
func (pm *basicManager) TranslatePodUID(uid types.UID) types.UID {
if uid == "" {
return uid
}
pm.lock.RLock()
defer pm.lock.RUnlock()
if translated, ok := pm.translationByUID[uid]; ok {
return translated
}
return uid
}

basicManager::GetUIDTranslations()

GetUIDTranslations()可以获取pod到mirrorPod及mirrorPod到pod的映射关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//***获取pod到mirrorPod及mirrorPod到pod的映射关系***//
func (pm *basicManager) GetUIDTranslations() (podToMirror, mirrorToPod map[types.UID]types.UID) {
pm.lock.RLock()
defer pm.lock.RUnlock()
podToMirror = make(map[types.UID]types.UID, len(pm.translationByUID))
mirrorToPod = make(map[types.UID]types.UID, len(pm.translationByUID))
// Insert empty translation mapping for all static pods.
for uid, pod := range pm.podByUID {
if !IsStaticPod(pod) {
continue
}
podToMirror[uid] = ""
}
// Fill in translations. Notice that if there is no mirror pod for a
// static pod, its uid will be translated into empty string "". This
// is WAI, from the caller side we can know that the static pod doesn't
// have a corresponding mirror pod instead of using static pod uid directly.
for k, v := range pm.translationByUID {
mirrorToPod[k] = v
podToMirror[v] = k
}
return podToMirror, mirrorToPod
}

basicManager::DeleteOrphanedMirrorPods()

如果在mirrorPodByFullName中存在,但在podByFullName中不存在,则该mirrorPod为orphanedMirrorPod,DeleteOrphanedMirrorPods()会清除这些orphanedMirrorPod。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//***删除orphanedMirrorPod***//
func (pm *basicManager) DeleteOrphanedMirrorPods() {
podFullNames := pm.getOrphanedMirrorPodNames()
for _, podFullName := range podFullNames {
pm.MirrorClient.DeleteMirrorPod(podFullName)
}
}
//***如果mirrorPodByFullName中存在,podByFullName中不存在,则认为该mirrorPod为orphanedMirrorPod***//
func (pm *basicManager) getOrphanedMirrorPodNames() []string {
pm.lock.RLock()
defer pm.lock.RUnlock()
var podFullNames []string
for podFullName := range pm.mirrorPodByFullName {
if _, ok := pm.podByFullName[podFullName]; !ok {
podFullNames = append(podFullNames, podFullName)
}
}
return podFullNames
}

basicManager::IsMirrorPodOf()

IsMirrorPodOf()可以检查mirrorPod和pod是否匹配。

1
2
3
4
5
6
7
8
9
10
11
12
//***检查mirrorPod和pod是否匹配***//
func (pm *basicManager) IsMirrorPodOf(mirrorPod, pod *api.Pod) bool {
// Check name and namespace first.
if pod.Name != mirrorPod.Name || pod.Namespace != mirrorPod.Namespace {
return false
}
hash, ok := getHashFromMirrorPod(mirrorPod)
if !ok {
return false
}
return hash == getPodHash(pod)
}

basicMirrorClient()

再来看下mirrorClient,由basicMirrorClient实现,定义在/pkg/kubelet/pod/mirror_Client.go中:

1
2
3
4
5
6
// basicMirrorClient is a functional MirrorClient. Mirror pods are stored in
// the kubelet directly because they need to be in sync with the internal
// pods.
type basicMirrorClient struct {
apiserverClient clientset.Interface
}

可以通过basicMirrorClient操作mirrorPod。由于podManager中嵌入了basicMirrorClient,所以podManager也有操作mirrorPod的能力。

basicMirrorClient可以使用NewBasicMirrorClient()方法生成:

1
2
3
4
// NewBasicMirrorClient returns a new MirrorClient.
func NewBasicMirrorClient(apiserverClient clientset.Interface) MirrorClient {
return &basicMirrorClient{apiserverClient: apiserverClient}
}

basicMirrorClient::CreateMirrorPod()

CreateMirrorPod()可以创建pod对应的mirrorPod。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//***创建mirror pod***//
func (mc *basicMirrorClient) CreateMirrorPod(pod *api.Pod) error {
if mc.apiserverClient == nil {
return nil
}
// Make a copy of the pod.
copyPod := *pod
copyPod.Annotations = make(map[string]string)
for k, v := range pod.Annotations {
copyPod.Annotations[k] = v
}
hash := getPodHash(pod)
copyPod.Annotations[kubetypes.ConfigMirrorAnnotationKey] = hash
//***创建mirror pod***//
apiPod, err := mc.apiserverClient.Core().Pods(copyPod.Namespace).Create(&copyPod)
if err != nil && errors.IsAlreadyExists(err) {
// Check if the existing pod is the same as the pod we want to create.
if h, ok := apiPod.Annotations[kubetypes.ConfigMirrorAnnotationKey]; ok && h == hash {
return nil
}
}
return err
}

basicMirrorClient::DeleteMirrorPod()

DeleteMirrorPod()可以删除mirrorPod。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//***删除mirror pod***//
func (mc *basicMirrorClient) DeleteMirrorPod(podFullName string) error {
if mc.apiserverClient == nil {
return nil
}
name, namespace, err := kubecontainer.ParsePodFullName(podFullName)
if err != nil {
glog.Errorf("Failed to parse a pod full name %q", podFullName)
return err
}
glog.V(2).Infof("Deleting a mirror pod %q", podFullName)
// TODO(random-liu): Delete the mirror pod with uid precondition in mirror pod manager
if err := mc.apiserverClient.Core().Pods(namespace).Delete(name, api.NewDeleteOptions(0)); err != nil && !errors.IsNotFound(err) {
glog.Errorf("Failed deleting a mirror pod %q: %v", podFullName, err)
}
return nil
}

IsStaticPod()

IsStaticPod()可以依据pod中annotations中标记的来源来判断是否为staticPod。

1
2
3
4
5
6
//***通过pod source来判断是否为static pod***//
//***kubetypes.ApiserverSource为"api"***//
func IsStaticPod(pod *api.Pod) bool {
source, err := kubetypes.GetPodSource(pod)
return err == nil && source != kubetypes.ApiserverSource
}

系统中定义了4种来源:

1
2
3
4
5
6
7
8
// Updates from a file
FileSource = "file"
// Updates from querying a web page
HTTPSource = "http"
// Updates from Kubernetes API Server
ApiserverSource = "api"
// Updates from all sources
AllSource = "*"

IsMirrorPod()

IsMirrorPod()通过annotations中的”kubernetes.io/config.mirror” key来判断该pod是否为mirrorPod。

1
2
3
4
5
//***kubetypes.ConfigMirrorAnnotationKey为"kubernetes.io/config.mirror"***//
func IsMirrorPod(pod *api.Pod) bool {
_, ok := pod.Annotations[kubetypes.ConfigMirrorAnnotationKey]
return ok
}

getHashFromMirrorPod()

getHashFromMirrorPod()获取mirrorPod annotations中的kubetypes.ConfigMirrorAnnotationKey,即”kubernetes.io/config.hash”的值。

1
2
3
4
func getHashFromMirrorPod(pod *api.Pod) (string, bool) {
hash, ok := pod.Annotations[kubetypes.ConfigMirrorAnnotationKey]
return hash, ok
}

kubetypes.ConfigMirrorAnnotationKey的设置定义在/pkg/kubelet/config/common.go中的applyDefaults()中。

getPodHash()

getPodHash()获取pod annotations中的kubetypes.ConfigMirrorAnnotationKey,即”kubernetes.io/config.hash”的值。

1
2
3
4
func getPodHash(pod *api.Pod) string {
// The annotation exists for all static pods.
return pod.Annotations[kubetypes.ConfigHashAnnotationKey]
}