本次分析将介绍kubelet的volumeManager。volumeManager负责维护volume挂载与ETCD中数据的一致性。

actualStateOfWorld

actualStateOfWorld代表了目前物理主机上volume的挂载状态,定义在/pkg/kubelet/volumemanager/cache/actual_state_of_world.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type actualStateOfWorld struct {
// nodeName is the name of this node. This value is passed to Attach/Detach
nodeName types.NodeName
// attachedVolumes is a map containing the set of volumes the kubelet volume
// manager believes to be successfully attached to this node. Volume types
// that do not implement an attacher interface are assumed to be in this
// state by default.
// The key in this map is the name of the volume and the value is an object
// containing more information about the attached volume.
//***记录volume被attached***//
attachedVolumes map[api.UniqueVolumeName]attachedVolume
// volumePluginMgr is the volume plugin manager used to create volume
// plugin objects.
volumePluginMgr *volume.VolumePluginMgr
sync.RWMutex
}

主要字段含义如下:

  1. nodeName: 物理主机名称;
  2. attachedVolumes: volume名称到attachedVolume的映射,attachedVolume表示已经挂载好的volume;
  3. volumePluginMgr: volume plugin管理者。

再来看下attachedVolume的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
type attachedVolume struct {
// volumeName contains the unique identifier for this volume.
volumeName api.UniqueVolumeName
// mountedPods is a map containing the set of pods that this volume has been
// successfully mounted to. The key in this map is the name of the pod and
// the value is a mountedPod object containing more information about the
// pod.
//***volume到pods的映射,即记录volume被mounted到pod***//
mountedPods map[volumetypes.UniquePodName]mountedPod
// spec is the volume spec containing the specification for this volume.
// Used to generate the volume plugin object, and passed to plugin methods.
// In particular, the Unmount method uses spec.Name() as the volumeSpecName
// in the mount path:
// /var/lib/kubelet/pods/{podUID}/volumes/{escapeQualifiedPluginName}/{volumeSpecName}/
spec *volume.Spec
// pluginName is the Unescaped Qualified name of the volume plugin used to
// attach and mount this volume. It is stored separately in case the full
// volume spec (everything except the name) can not be reconstructed for a
// volume that should be unmounted (which would be the case for a mount path
// read from disk without a full volume spec).
pluginName string
// pluginIsAttachable indicates the volume plugin used to attach and mount
// this volume implements the volume.Attacher interface
pluginIsAttachable bool
// globallyMounted indicates that the volume is mounted to the underlying
// device at a global mount point. This global mount point must be unmounted
// prior to detach.
globallyMounted bool
// devicePath contains the path on the node where the volume is attached for
// attachable volumes
devicePath string
}

attachedVolume中有mountedPods字段,记录了该volume和使用该volume的pod的映射关系。

现在来看actualStateOfWorld实现的主要方法。

actualStateOfWorld::MarkVolumeAsAttached()

1
2
3
4
func (asw *actualStateOfWorld) MarkVolumeAsAttached(
volumeName api.UniqueVolumeName, volumeSpec *volume.Spec, _ types.NodeName, devicePath string) error {
return asw.addVolume(volumeName, volumeSpec, devicePath)
}

attach的过程在actualStateOfWorld中的表现就是把volume加入到actualStateOfWorld,通过调用addVolume()方法完成。

actualStateOfWorld::MarkVolumeDetached()

1
2
3
4
func (asw *actualStateOfWorld) MarkVolumeAsDetached(
volumeName api.UniqueVolumeName, nodeName types.NodeName) {
asw.DeleteVolume(volumeName)
}

dettach的过程在actualStateOfWorld中的表现就是把volume从actualStateOfWorld中删除,通过调用DeleteVolume()完成。

actualStateOfWorld::MarkVolumeAsMounted()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//***标记volume被mounted***//
func (asw *actualStateOfWorld) MarkVolumeAsMounted(
podName volumetypes.UniquePodName,
podUID types.UID,
volumeName api.UniqueVolumeName,
mounter volume.Mounter,
outerVolumeSpecName string,
volumeGidValue string) error {
return asw.AddPodToVolume(
podName,
podUID,
volumeName,
mounter,
outerVolumeSpecName,
volumeGidValue)
}

mount的过程在actualStateOfWorld中的表现就是把pod信息记录到actualStateOfWorld的attachedVolume中,通过调用AddPodToVolume()完成。

actualStateOfWorld::MarkVolumeAsUnmounted()

1
2
3
4
5
//***把volume从actual world中删除***//
func (asw *actualStateOfWorld) MarkVolumeAsUnmounted(
podName volumetypes.UniquePodName, volumeName api.UniqueVolumeName) error {
return asw.DeletePodFromVolume(podName, volumeName)
}

unmount的过程在actualStateOfWorld中的表现就是把pod和volume的关系从actualStateOfWorld的attachedVolume中删除,通过DeletePodFromVolume()完成。

actualStateOfWorld::addVolume()

addVolume()可以把volume加入到actualStateOfWorld上。volume会生成自己的attachedVolume。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// addVolume adds the given volume to the cache indicating the specified
// volume is attached to this node. If no volume name is supplied, a unique
// volume name is generated from the volumeSpec and returned on success. If a
// volume with the same generated name already exists, this is a noop. If no
// volume plugin can support the given volumeSpec or more than one plugin can
// support it, an error is returned.
func (asw *actualStateOfWorld) addVolume(
volumeName api.UniqueVolumeName, volumeSpec *volume.Spec, devicePath string) error {
asw.Lock()
defer asw.Unlock()
//***通过volume spec找到volume plugin***//
volumePlugin, err := asw.volumePluginMgr.FindPluginBySpec(volumeSpec)
if err != nil || volumePlugin == nil {
return fmt.Errorf(
"failed to get Plugin from volumeSpec for volume %q err=%v",
volumeSpec.Name(),
err)
}
if len(volumeName) == 0 {
volumeName, err = volumehelper.GetUniqueVolumeNameFromSpec(volumePlugin, volumeSpec)
if err != nil {
return fmt.Errorf(
"failed to GetUniqueVolumeNameFromSpec for volumeSpec %q using volume plugin %q err=%v",
volumeSpec.Name(),
volumePlugin.GetPluginName(),
err)
}
}
//***确认该volume可以被attach***//
pluginIsAttachable := false
if _, ok := volumePlugin.(volume.AttachableVolumePlugin); ok {
pluginIsAttachable = true
}
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
volumeObj = attachedVolume{
volumeName: volumeName,
spec: volumeSpec,
mountedPods: make(map[volumetypes.UniquePodName]mountedPod),
pluginName: volumePlugin.GetPluginName(),
pluginIsAttachable: pluginIsAttachable,
globallyMounted: false,
devicePath: devicePath,
}
} else {
// If volume object already exists, update the fields such as device path
volumeObj.devicePath = devicePath
glog.V(2).Infof("Volume %q is already added to attachedVolume list, update device path %q",
volumeName,
devicePath)
}
asw.attachedVolumes[volumeName] = volumeObj
return nil
}

actualStateOfWorld::DeleteVolume()

DeleteVolume()可以从actualStateOfWorld中删除volume。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//***从actual world中删除volume***//
//***若volume挂载到某pod中,则返回错误***//
func (asw *actualStateOfWorld) DeleteVolume(volumeName api.UniqueVolumeName) error {
asw.Lock()
defer asw.Unlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
return nil
}
if len(volumeObj.mountedPods) != 0 {
return fmt.Errorf(
"failed to DeleteVolume %q, it still has %v mountedPods",
volumeName,
len(volumeObj.mountedPods))
}
delete(asw.attachedVolumes, volumeName)
return nil
}

actualStateOfWorld::VolumeExists()

VolumeExists()可以返回volume在actualStateOfWorld中是否存在。

1
2
3
4
5
6
7
8
9
//***检查volume是否已经存***//
func (asw *actualStateOfWorld) VolumeExists(
volumeName api.UniqueVolumeName) bool {
asw.RLock()
defer asw.RUnlock()
_, volumeExists := asw.attachedVolumes[volumeName]
return volumeExists
}

actualStateOfWorld::AddPodToVolume()

AddPodToVolume()把pod的信息加入到volume对应的attachedVolume中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//***把pod的信息加入到对应的volume的mountedPods中***//
func (asw *actualStateOfWorld) AddPodToVolume(
podName volumetypes.UniquePodName,
podUID types.UID,
volumeName api.UniqueVolumeName,
mounter volume.Mounter,
outerVolumeSpecName string,
volumeGidValue string) error {
asw.Lock()
defer asw.Unlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
return fmt.Errorf(
"no volume with the name %q exists in the list of attached volumes",
volumeName)
}
podObj, podExists := volumeObj.mountedPods[podName]
if !podExists {
podObj = mountedPod{
podName: podName,
podUID: podUID,
mounter: mounter,
outerVolumeSpecName: outerVolumeSpecName,
volumeGidValue: volumeGidValue,
}
}
// If pod exists, reset remountRequired value
podObj.remountRequired = false
asw.attachedVolumes[volumeName].mountedPods[podName] = podObj
return nil
}

actualStateOfWorld::DeletePodFromVolume()

DeletePodFromVolume()从actualStateOfWorld中volume对应的attachedVolume中删除pod信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//***删除volume到pod的map中的pod***//
func (asw *actualStateOfWorld) DeletePodFromVolume(
podName volumetypes.UniquePodName, volumeName api.UniqueVolumeName) error {
asw.Lock()
defer asw.Unlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
return fmt.Errorf(
"no volume with the name %q exists in the list of attached volumes",
volumeName)
}
_, podExists := volumeObj.mountedPods[podName]
if podExists {
delete(asw.attachedVolumes[volumeName].mountedPods, podName)
}
return nil
}

actualStateOfWorld::PodExistsInVolume()

PodExistsInVolume()可以返回pod是否挂载volume。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//***检查pod是否已经在volume的map中***//
func (asw *actualStateOfWorld) PodExistsInVolume(
podName volumetypes.UniquePodName,
volumeName api.UniqueVolumeName) (bool, string, error) {
asw.RLock()
defer asw.RUnlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
return false, "", newVolumeNotAttachedError(volumeName)
}
podObj, podExists := volumeObj.mountedPods[podName]
if podExists && podObj.remountRequired {
return true, volumeObj.devicePath, newRemountRequiredError(volumeObj.volumeName, podObj.podName)
}
return podExists, volumeObj.devicePath, nil
}

desiredStateOfWorld

接着分析desiredStateOfWorld。disiredStateOfWorld表示系统希望得到的挂载状态,定义在/pkg/kubelet/volumemanager/cache/desired_state_of_world.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type desiredStateOfWorld struct {
// volumesToMount is a map containing the set of volumes that should be
// attached to this node and mounted to the pods referencing it. The key in
// the map is the name of the volume and the value is a volume object
// containing more information about the volume.
//***volumesToMount的key为volume name,value为volumeToMount***//
volumesToMount map[api.UniqueVolumeName]volumeToMount
// volumePluginMgr is the volume plugin manager used to create volume
// plugin objects.
volumePluginMgr *volume.VolumePluginMgr
sync.RWMutex
}
// NewDesiredStateOfWorld returns a new instance of DesiredStateOfWorld.
func NewDesiredStateOfWorld(volumePluginMgr *volume.VolumePluginMgr) DesiredStateOfWorld {
return &desiredStateOfWorld{
volumesToMount: make(map[api.UniqueVolumeName]volumeToMount),
volumePluginMgr: volumePluginMgr,
}
}

可以看到desiredStateOfWorld中有个volumesToMount的字段记录了volume名称和volumeToMount的映射。volumeToMount的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// The volume object represents a volume that should be attached to this node,
// and mounted to podsToMount.
type volumeToMount struct {
// volumeName contains the unique identifier for this volume.
volumeName api.UniqueVolumeName
// podsToMount is a map containing the set of pods that reference this
// volume and should mount it once it is attached. The key in the map is
// the name of the pod and the value is a pod object containing more
// information about the pod.
//***Fankang***//
//***podToMount中有pod信息***//
podsToMount map[types.UniquePodName]podToMount
// pluginIsAttachable indicates that the plugin for this volume implements
// the volume.Attacher interface
pluginIsAttachable bool
// volumeGidValue contains the value of the GID annotation, if present.
volumeGidValue string
// reportedInUse indicates that the volume was successfully added to the
// VolumesInUse field in the node's status.
reportedInUse bool
}

来看下desiredStateOfWorld实现的方法.

desiredStateOfWorld::AddPodToVolume()

AddPodToVolume()把pod信息加入到desiredStateOfWorld的volumeToMount中。其中的参数volumeSpec是最终的volume的信息,如PVC时,会转换到PV的信息,其他则各pod中定义的volume信息相同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
//***把pod加入到podsToMount中***//
func (dsw *desiredStateOfWorld) AddPodToVolume(
podName types.UniquePodName,
pod *api.Pod,
volumeSpec *volume.Spec,
outerVolumeSpecName string,
volumeGidValue string) (api.UniqueVolumeName, error) {
dsw.Lock()
defer dsw.Unlock()
volumePlugin, err := dsw.volumePluginMgr.FindPluginBySpec(volumeSpec)
if err != nil || volumePlugin == nil {
return "", fmt.Errorf(
"failed to get Plugin from volumeSpec for volume %q err=%v",
volumeSpec.Name(),
err)
}
var volumeName api.UniqueVolumeName
// The unique volume name used depends on whether the volume is attachable
// or not.
attachable := dsw.isAttachableVolume(volumeSpec)
if attachable {
// For attachable volumes, use the unique volume name as reported by
// the plugin.
volumeName, err =
volumehelper.GetUniqueVolumeNameFromSpec(volumePlugin, volumeSpec)
if err != nil {
return "", fmt.Errorf(
"failed to GetUniqueVolumeNameFromSpec for volumeSpec %q using volume plugin %q err=%v",
volumeSpec.Name(),
volumePlugin.GetPluginName(),
err)
}
} else {
// For non-attachable volumes, generate a unique name based on the pod
// namespace and name and the name of the volume within the pod.
volumeName = volumehelper.GetUniqueVolumeNameForNonAttachableVolume(podName, volumePlugin, volumeSpec)
}
volumeObj, volumeExists := dsw.volumesToMount[volumeName]
if !volumeExists {
volumeObj = volumeToMount{
volumeName: volumeName,
podsToMount: make(map[types.UniquePodName]podToMount),
pluginIsAttachable: attachable,
volumeGidValue: volumeGidValue,
reportedInUse: false,
}
dsw.volumesToMount[volumeName] = volumeObj
}
// Create new podToMount object. If it already exists, it is refreshed with
// updated values (this is required for volumes that require remounting on
// pod update, like Downward API volumes).
dsw.volumesToMount[volumeName].podsToMount[podName] = podToMount{
podName: podName,
pod: pod,
spec: volumeSpec,
outerVolumeSpecName: outerVolumeSpecName,
}
return volumeName, nil
}

desiredStateOfWorld::DeletePodFromVolume()

DeletePodFromVolume()把pod从指定volumeName对应的volumeToMount中删除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//***把pod从podsToMount中删除***//
func (dsw *desiredStateOfWorld) DeletePodFromVolume(
podName types.UniquePodName, volumeName api.UniqueVolumeName) {
dsw.Lock()
defer dsw.Unlock()
volumeObj, volumeExists := dsw.volumesToMount[volumeName]
if !volumeExists {
return
}
if _, podExists := volumeObj.podsToMount[podName]; !podExists {
return
}
// Delete pod if it exists
delete(dsw.volumesToMount[volumeName].podsToMount, podName)
if len(dsw.volumesToMount[volumeName].podsToMount) == 0 {
// Delete volume if no child pods left
delete(dsw.volumesToMount, volumeName)
}
}

desiredStateOfWorld::VolumeExists()

VolumeExists()可以返回volume就否在desiredStateOfWorld中。

1
2
3
4
5
6
7
8
9
//***返回volume是否在volumesToMount中***//
func (dsw *desiredStateOfWorld) VolumeExists(
volumeName api.UniqueVolumeName) bool {
dsw.RLock()
defer dsw.RUnlock()
_, volumeExists := dsw.volumesToMount[volumeName]
return volumeExists
}

desiredStateOfWorld::PodExistsInVolume()

PodExistsInVolume()可以返回pod是否需要挂载某volume。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//***判断pod是否需要挂载某volume***//
func (dsw *desiredStateOfWorld) PodExistsInVolume(
podName types.UniquePodName, volumeName api.UniqueVolumeName) bool {
dsw.RLock()
defer dsw.RUnlock()
volumeObj, volumeExists := dsw.volumesToMount[volumeName]
if !volumeExists {
return false
}
_, podExists := volumeObj.podsToMount[podName]
return podExists
}

desiredStateOfWorldPopulator

有了desiredStateOfWorld后,我们需要一个能维护desiredStateOfWorld中内容为最新内容的机制,这个机制就是desiredStateOfWorldPopulator。desiredStateOfWorldPopulator定义在/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go中:

1
2
3
4
5
6
7
8
9
10
type desiredStateOfWorldPopulator struct {
kubeClient internalclientset.Interface
loopSleepDuration time.Duration
getPodStatusRetryDuration time.Duration
podManager pod.Manager
desiredStateOfWorld cache.DesiredStateOfWorld
pods processedPods
kubeContainerRuntime kubecontainer.Runtime
timeOfLastGetPodStatus time.Time
}

可以看到,desiredStateOfWorldPopulator中有desiredStateOfWorld。
desiredStateOfWorldPopulator的启动函数为Run(),定义如下:

1
2
3
4
5
//***启动desiredStateOfWorldPopulator***//
func (dswp *desiredStateOfWorldPopulator) Run(stopCh <-chan struct{}) {
//***周期调用populatorLoopFunc()***//
wait.Until(dswp.populatorLoopFunc(), dswp.loopSleepDuration, stopCh)
}

可以看出,Run()主要周期地执行populatorLoopFunc():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (dswp *desiredStateOfWorldPopulator) populatorLoopFunc() func() {
return func() {
//***添加新的pod到desiredStateOfWorld***//
dswp.findAndAddNewPods()
// findAndRemoveDeletedPods() calls out to the container runtime to
// determine if the containers for a given pod are terminated. This is
// an expensive operation, therefore we limit the rate that
// findAndRemoveDeletedPods() is called independently of the main
// populator loop.
if time.Since(dswp.timeOfLastGetPodStatus) < dswp.getPodStatusRetryDuration {
glog.V(5).Infof(
"Skipping findAndRemoveDeletedPods(). Not permitted until %v (getPodStatusRetryDuration %v).",
dswp.timeOfLastGetPodStatus.Add(dswp.getPodStatusRetryDuration),
dswp.getPodStatusRetryDuration)
return
}
//***从desiredStateOfWorld删除已经删除的pod**//
dswp.findAndRemoveDeletedPods()
}
}

populatorLoopFunc()的功能由两个:

  1. 往desiredStateOfWorld中添加新的pod,由findAndAddNewPods()完成;
  2. 从desiredStateOfWorld删除已经删除的pod,由findAndRemoveDeletedPods()完成。

所以从这只可以看出,我们不能改变一个pod的挂载。

findAndAddNewPods()定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
//***从pod manager获取Pods,并加入到desired world中***//
func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
for _, pod := range dswp.podManager.GetPods() {
if isPodTerminated(pod) {
// Do not (re)add volumes for terminated pods
continue
}
dswp.processPodVolumes(pod)
}
}
//***把pod的volume加入到desiredStateOfWorldPopulator ***//
func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *api.Pod) {
if pod == nil {
return
}
uniquePodName := volumehelper.GetUniquePodName(pod)
//***检查pod是否已经被处理***//
if dswp.podPreviouslyProcessed(uniquePodName) {
return
}
// Process volume spec for each volume defined in pod
for _, podVolume := range pod.Spec.Volumes {
volumeSpec, volumeGidValue, err :=
dswp.createVolumeSpec(podVolume, pod.Namespace)
if err != nil {
glog.Errorf(
"Error processing volume %q for pod %q: %v",
podVolume.Name,
format.Pod(pod),
err)
continue
}
// Add volume to desired state of world
//***把volume添加到desired world***//
_, err = dswp.desiredStateOfWorld.AddPodToVolume(
uniquePodName, pod, volumeSpec, podVolume.Name, volumeGidValue)
if err != nil {
glog.Errorf(
"Failed to add volume %q (specName: %q) for pod %q to desiredStateOfWorld. err=%v",
podVolume.Name,
volumeSpec.Name(),
uniquePodName,
err)
}
glog.V(10).Infof(
"Added volume %q (volSpec=%q) for pod %q to desired state.",
podVolume.Name,
volumeSpec.Name(),
uniquePodName)
}
//***标记pod已经被处理***//
dswp.markPodProcessed(uniquePodName)
}

findAndAddNewPods()从podManager处获取pods,然后调用processPodVolumes()处理每个pod。
processPodVolumes()对于会调用desiredStateOfWorld的AddPodToVolume()把pod加入到desiredStateOfWorld中。desiredStateOfWorldPopulator会对处理过的pod进行记录,确保每个pod只被处理一次。

再来看findAndRemoveDeletedPods():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// Iterate through all pods in desired state of world, and remove if they no
// longer exist
func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
var runningPods []*kubecontainer.Pod
runningPodsFetched := false
for _, volumeToMount := range dswp.desiredStateOfWorld.GetVolumesToMount() {
pod, podExists := dswp.podManager.GetPodByUID(volumeToMount.Pod.UID)
if podExists {
// Skip running pods
if !isPodTerminated(pod) {
continue
}
// Skip non-memory backed volumes belonging to terminated pods
volume := volumeToMount.VolumeSpec.Volume
if volume == nil {
continue
}
if (volume.EmptyDir == nil || volume.EmptyDir.Medium != api.StorageMediumMemory) &&
volume.ConfigMap == nil && volume.Secret == nil {
continue
}
}
// Once a pod has been deleted from kubelet pod manager, do not delete
// it immediately from volume manager. Instead, check the kubelet
// containerRuntime to verify that all containers in the pod have been
// terminated.
//***检测是否还有容器在运行***//
if !runningPodsFetched {
var getPodsErr error
runningPods, getPodsErr = dswp.kubeContainerRuntime.GetPods(false)
if getPodsErr != nil {
glog.Errorf(
"kubeContainerRuntime.findAndRemoveDeletedPods returned error %v.",
getPodsErr)
continue
}
runningPodsFetched = true
dswp.timeOfLastGetPodStatus = time.Now()
}
runningContainers := false
for _, runningPod := range runningPods {
if runningPod.ID == volumeToMount.Pod.UID {
if len(runningPod.Containers) > 0 {
runningContainers = true
}
break
}
}
if runningContainers {
glog.V(5).Infof(
"Pod %q has been removed from pod manager. However, it still has one or more containers in the non-exited state. Therefore, it will not be removed from volume manager.",
format.Pod(volumeToMount.Pod))
continue
}
glog.V(5).Infof(
"Removing volume %q (volSpec=%q) for pod %q from desired state.",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
format.Pod(volumeToMount.Pod))
//***把volume从desired world中删除***//
dswp.desiredStateOfWorld.DeletePodFromVolume(
volumeToMount.PodName, volumeToMount.VolumeName)
//***标记该pod未被处理过***//
dswp.deleteProcessedPod(volumeToMount.PodName)
}
}

findAndRemoveDeletedPods()找到desiredStateOfWorld中有,但podManager中没有的pod,然后把该pod从DesiredStateOfWorld中删除,并把pod从记录表中删除。

所以,现在我们已经完成了podManager和desiredStateOfWorld的同步。

reconciler

现在还差actualStateOfWorld和desiredStateOfWorld的一致性维护和物理机上挂载和actualStateOfWorld一致性的维护。这些都由reconciler完成。
reconciler定义在/pkg/kubelet/volumemanager/reconciler/reconciler.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type reconciler struct {
kubeClient internalclientset.Interface
controllerAttachDetachEnabled bool
loopSleepDuration time.Duration
syncDuration time.Duration
waitForAttachTimeout time.Duration
nodeName types.NodeName
desiredStateOfWorld cache.DesiredStateOfWorld
actualStateOfWorld cache.ActualStateOfWorld
operationExecutor operationexecutor.OperationExecutor
mounter mount.Interface
volumePluginMgr *volumepkg.VolumePluginMgr
kubeletPodsDir string
timeOfLastSync time.Time
}

reconciler有desiredStateOfWorld和actualStateOfWorld,及operationExecutor用来执行具体的挂载操作。
reconciler启动方法如下:

1
2
3
func (rc *reconciler) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
wait.Until(rc.reconciliationLoopFunc(sourcesReady), rc.loopSleepDuration, stopCh)
}

reconciliationLoopFunc()定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//***reconciler的循环函数***//
func (rc *reconciler) reconciliationLoopFunc(sourcesReady config.SourcesReady) func() {
return func() {
//***处理actual state of world和desired state of world的一致性***//
rc.reconcile()
// Add all sources ready check so that reconciler's reconstruct process will start after
// desired state of world is populated with pod volume information from different sources. Otherwise,
// reconciler's reconstruct process may add incomplete volume information and cause confusion.
// In addition, if some sources are not ready, the reconstruct process may clean up pods' volumes
// that are still in use because desired states could not get a complete list of pods.
if sourcesReady.AllReady() && time.Since(rc.timeOfLastSync) > rc.syncDuration {
glog.V(5).Infof("Sources are all ready, starting reconstruct state function")
//***处理磁盘volume和actual state of world,desired state of world的一致性***//
rc.sync()
}
}
}

reconciliationLoopFunc()由两个功能:

  1. 处理actualStateOfWorld和desiredStateOfWorld的一致性,由reconcile()完成;
  2. 处理磁盘volume和actual state of world,desired state of world的一致性,由sync()完成。

先来看reconcile(),定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
//***处理actual state of world和desired state of world的一致性***//
func (rc *reconciler) reconcile() {
// Unmounts are triggered before mounts so that a volume that was
// referenced by a pod that was deleted and is now referenced by another
// pod is unmounted from the first pod before being mounted to the new
// pod.
// Ensure volumes that should be unmounted are unmounted.
//***Fankang***//
//***actualStateOfWorld中有,desiredStaeOfWorld中无,则umount***//
//***一个volume挂载到一个pod视为一个mountedVolume***//
for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() {
if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
// Volume is mounted, unmount it
glog.V(12).Infof("Attempting to start UnmountVolume for volume %q (spec.Name: %q) from pod %q (UID: %q).",
mountedVolume.VolumeName,
mountedVolume.OuterVolumeSpecName,
mountedVolume.PodName,
mountedVolume.PodUID)
//***Fankang***//
//***把volume unmount,并作适当清理***//
err := rc.operationExecutor.UnmountVolume(
mountedVolume.MountedVolume, rc.actualStateOfWorld)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.UnmountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
mountedVolume.VolumeName,
mountedVolume.OuterVolumeSpecName,
mountedVolume.PodName,
mountedVolume.PodUID,
rc.controllerAttachDetachEnabled,
err)
}
if err == nil {
glog.Infof("UnmountVolume operation started for volume %q (spec.Name: %q) from pod %q (UID: %q).",
mountedVolume.VolumeName,
mountedVolume.OuterVolumeSpecName,
mountedVolume.PodName,
mountedVolume.PodUID)
}
}
}
// Ensure volumes that should be attached/mounted are attached/mounted.
//***Fankang***//
//***如果desire world中有,而actual world未挂载,则执行挂载***//
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
volumeToMount.DevicePath = devicePath
//***Fankang***//
//***处理volume not attached的情况***//
if cache.IsVolumeNotAttachedError(err) {
if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
// Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
// for controller to finish attaching volume.
glog.V(12).Infof("Attempting to start VerifyControllerAttachedVolume for volume %q (spec.Name: %q) pod %q (UID: %q)",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID)
err := rc.operationExecutor.VerifyControllerAttachedVolume(
volumeToMount.VolumeToMount,
rc.nodeName,
rc.actualStateOfWorld)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.VerifyControllerAttachedVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID,
rc.controllerAttachDetachEnabled,
err)
}
if err == nil {
glog.Infof("VerifyControllerAttachedVolume operation started for volume %q (spec.Name: %q) pod %q (UID: %q)",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID)
}
} else {
// Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
// so attach it
volumeToAttach := operationexecutor.VolumeToAttach{
VolumeName: volumeToMount.VolumeName,
VolumeSpec: volumeToMount.VolumeSpec,
NodeName: rc.nodeName,
}
glog.V(12).Infof("Attempting to start AttachVolume for volume %q (spec.Name: %q) pod %q (UID: %q)",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID)
err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.AttachVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID,
rc.controllerAttachDetachEnabled,
err)
}
if err == nil {
glog.Infof("AttachVolume operation started for volume %q (spec.Name: %q) pod %q (UID: %q)",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID)
}
}
} else if !volMounted || cache.IsRemountRequiredError(err) {
//***Fankang***//
//***处理volume未被挂载,或需要重新挂载的情况***//
// Volume is not mounted, or is already mounted, but requires remounting
remountingLogStr := ""
if cache.IsRemountRequiredError(err) {
remountingLogStr = "Volume is already mounted to pod, but remount was requested."
}
glog.V(12).Infof("Attempting to start MountVolume for volume %q (spec.Name: %q) to pod %q (UID: %q). %s",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID,
remountingLogStr)
//***Fankang***//
//***执行挂载***//
err := rc.operationExecutor.MountVolume(
rc.waitForAttachTimeout,
volumeToMount.VolumeToMount,
rc.actualStateOfWorld)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.MountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID,
rc.controllerAttachDetachEnabled,
err)
}
if err == nil {
logMsg := fmt.Sprintf("MountVolume operation started for volume %q (spec.Name: %q) to pod %q (UID: %q). %s",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID,
remountingLogStr)
if remountingLogStr == "" {
glog.V(1).Infof(logMsg)
} else {
glog.V(5).Infof(logMsg)
}
}
}
}
// Ensure devices that should be detached/unmounted are detached/unmounted.
//***Fankang***//
//***处理actualStateOfWorld中需要unmount或detach的volume***//
for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) {
if attachedVolume.GloballyMounted {
// Volume is globally mounted to device, unmount it
glog.V(12).Infof("Attempting to start UnmountDevice for volume %q (spec.Name: %q)",
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name())
err := rc.operationExecutor.UnmountDevice(
attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.UnmountDevice failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v",
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name(),
rc.controllerAttachDetachEnabled,
err)
}
if err == nil {
glog.Infof("UnmountDevice operation started for volume %q (spec.Name: %q)",
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name())
}
} else {
// Volume is attached to node, detach it
if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable {
// Kubelet not responsible for detaching or this volume has a non-attachable volume plugin,
// so just remove it to actualStateOfWorld without attach.
rc.actualStateOfWorld.MarkVolumeAsDetached(
attachedVolume.VolumeName, rc.nodeName)
} else {
// Only detach if kubelet detach is enabled
glog.V(12).Infof("Attempting to start DetachVolume for volume %q (spec.Name: %q)",
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name())
err := rc.operationExecutor.DetachVolume(
attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.DetachVolume failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v",
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name(),
rc.controllerAttachDetachEnabled,
err)
}
if err == nil {
glog.Infof("DetachVolume operation started for volume %q (spec.Name: %q)",
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name())
}
}
}
}
}
}

reconcile()主要处理以下三种情况:

  1. actualStateOfWorld中有,但desiredStateOfWorld中没有,则调用operationExecutor.UnmountVolume()执行unmount操作;
  2. desiredStateOfWorld中有,但actualStateOfWorld中未挂载,则执行attach操作或mount操作;
  3. 对actualStateOfWorld中的unmounted volume进行检查,如果desiredStateOfWorld中没有,则从actualStateOfWorld中删除volume。

再来看sync():

1
2
3
4
func (rc *reconciler) sync() {
defer rc.updateLastSyncTime()
rc.syncStates(rc.kubeletPodsDir)
}

sync()主要调用了syncStates():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
//***维护磁盘上volume与actual,desired state of world的一致性***//
func (rc *reconciler) syncStates(podsDir string) {
// Get volumes information by reading the pod's directory
//***获取磁盘上的volume***//
podVolumes, err := getVolumesFromPodDir(podsDir)
if err != nil {
glog.Errorf("Cannot get volumes from disk %v", err)
return
}
//***获取需要update的volume***//
volumesNeedUpdate := make(map[api.UniqueVolumeName]*reconstructedVolume)
for _, volume := range podVolumes {
reconstructedVolume, err := rc.reconstructVolume(volume)
if err != nil {
glog.Errorf("Could not construct volume information: %v", err)
continue
}
// Check if there is an pending operation for the given pod and volume.
// Need to check pending operation before checking the actual and desired
// states to avoid race condition during checking. For example, the following
// might happen if pending operation is checked after checking actual and desired states.
// 1. Checking the pod and it does not exist in either actual or desired state.
// 2. An operation for the given pod finishes and the actual state is updated.
// 3. Checking and there is no pending operation for the given pod.
// During state reconstruction period, no new volume operations could be issued. If the
// mounted path is not in either pending operation, or actual or desired states, this
// volume needs to be reconstructed back to the states.
pending := rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, reconstructedVolume.podName)
dswExist := rc.desiredStateOfWorld.PodExistsInVolume(reconstructedVolume.podName, reconstructedVolume.volumeName)
aswExist, _, _ := rc.actualStateOfWorld.PodExistsInVolume(reconstructedVolume.podName, reconstructedVolume.volumeName)
//***kubelet第一次启动时执行***//
if !rc.StatesHasBeenSynced() {
// In case this is the first time to reconstruct state after kubelet starts, for a persistant volume, it must have
// been mounted before kubelet restarts because no mount operations could be started at this time (node
// status has not yet been updated before this very first syncStates finishes, so that VerifyControllerAttachedVolume will fail),
// In this case, the volume state should be put back to actual state now no matter desired state has it or not.
// This is to prevent node status from being updated to empty for attachable volumes. This might happen because
// in the case that a volume is discovered on disk, and it is part of desired state, but is then quickly deleted
// from the desired state. If in such situation, the volume is not added to the actual state, the node status updater will
// not get this volume from either actual or desired state. In turn, this might cause master controller
// detaching while the volume is still mounted.
if aswExist || !reconstructedVolume.pluginIsAttachable {
continue
}
} else {
// Check pending first since no new operations could be started at this point.
// Otherwise there might a race condition in checking actual states and pending operations
//***Fankang***//
//***在pending状态或desired state of world中存在或actual state of world中存在***//
if pending || dswExist || aswExist {
continue
}
}
glog.V(2).Infof(
"Reconciler sync states: could not find pod information in desired or actual states or pending operation, update it in both states: %+v",
reconstructedVolume)
volumesNeedUpdate[reconstructedVolume.volumeName] = reconstructedVolume
}
//***volume update,此处的volumesNeedUpdate是在desired state of world和actual state of world都不存在的***//
if len(volumesNeedUpdate) > 0 {
if err = rc.updateStates(volumesNeedUpdate); err != nil {
glog.Errorf("Error occurred during reconstruct volume from disk: %v", err)
}
}
}

syncStates()会从磁盘上获取volume信息,然后排除状态为pending,或desiredStateOfWorld中存在,或actualStateOfWorld中存在的volume挂载,然后通过updateStates()方法把缺失的挂载加到desiredStateOfWorld和actualStateOfWorld中,以后的事情就交给populator及reconciler来处理了,如果该volume挂载是多余的,也会被删除的。

volumeManager

最后来看volumeManager,管理物理节点上的volume挂载。volumeManager定义在/pkg/kubelet/volumemanager/volume_manager.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// volumeManager implements the VolumeManager interface
type volumeManager struct {
// kubeClient is the kube API client used by DesiredStateOfWorldPopulator to
// communicate with the API server to fetch PV and PVC objects
kubeClient internalclientset.Interface
// volumePluginMgr is the volume plugin manager used to access volume
// plugins. It must be pre-initialized.
//***volume plugin manager,管理volume plugin***//
volumePluginMgr *volume.VolumePluginMgr
// desiredStateOfWorld is a data structure containing the desired state of
// the world according to the volume manager: i.e. what volumes should be
// attached and which pods are referencing the volumes).
// The data structure is populated by the desired state of the world
// populator using the kubelet pod manager.
desiredStateOfWorld cache.DesiredStateOfWorld
// actualStateOfWorld is a data structure containing the actual state of
// the world according to the manager: i.e. which volumes are attached to
// this node and what pods the volumes are mounted to.
// The data structure is populated upon successful completion of attach,
// detach, mount, and unmount actions triggered by the reconciler.
actualStateOfWorld cache.ActualStateOfWorld
// operationExecutor is used to start asynchronous attach, detach, mount,
// and unmount operations.
//***operationExecutor可以执行挂载等操作***//
operationExecutor operationexecutor.OperationExecutor
// reconciler runs an asynchronous periodic loop to reconcile the
// desiredStateOfWorld with the actualStateOfWorld by triggering attach,
// detach, mount, and unmount operations using the operationExecutor.
//***reconciler通过触发mount, attach等操作使desiredStateOfWorld和actualStateOfWorld保持一致***//
reconciler reconciler.Reconciler
// desiredStateOfWorldPopulator runs an asynchronous periodic loop to
// populate the desiredStateOfWorld using the kubelet PodManager.
desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator
}

volumeManager中有desiredStateOfWorld,actualStateOfWorld,operationExecutor,reconciler及desiredStateOfWorldPopulator。

volumeManager的生成方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//***生成volume manager***//
func NewVolumeManager(
controllerAttachDetachEnabled bool,
nodeName k8stypes.NodeName,
podManager pod.Manager,
kubeClient internalclientset.Interface,
volumePluginMgr *volume.VolumePluginMgr,
kubeContainerRuntime kubecontainer.Runtime,
mounter mount.Interface,
kubeletPodsDir string,
recorder record.EventRecorder,
checkNodeCapabilitiesBeforeMount bool) (VolumeManager, error) {
vm := &volumeManager{
kubeClient: kubeClient,
volumePluginMgr: volumePluginMgr,
desiredStateOfWorld: cache.NewDesiredStateOfWorld(volumePluginMgr),
actualStateOfWorld: cache.NewActualStateOfWorld(nodeName, volumePluginMgr),
operationExecutor: operationexecutor.NewOperationExecutor(
kubeClient,
volumePluginMgr,
recorder,
checkNodeCapabilitiesBeforeMount),
}
vm.reconciler = reconciler.NewReconciler(
kubeClient,
controllerAttachDetachEnabled,
reconcilerLoopSleepPeriod,
reconcilerSyncStatesSleepPeriod,
waitForAttachTimeout,
nodeName,
vm.desiredStateOfWorld,
vm.actualStateOfWorld,
vm.operationExecutor,
mounter,
volumePluginMgr,
kubeletPodsDir)
vm.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
kubeClient,
desiredStateOfWorldPopulatorLoopSleepPeriod,
desiredStateOfWorldPopulatorGetPodStatusRetryDuration,
podManager,
vm.desiredStateOfWorld,
kubeContainerRuntime)
return vm, nil
}

volumeManager启动方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//***启动volume manager***//
func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
defer runtime.HandleCrash()
//***Fankang***//
//***启动desiredStateOfWorldPopulator***//
//***desiredStateOfWorldPopulator获取pod的挂载信息,并同步到desired world***//
go vm.desiredStateOfWorldPopulator.Run(stopCh)
glog.V(2).Infof("The desired_state_of_world populator starts")
glog.Infof("Starting Kubelet Volume Manager")
//***启动reconciler***//
//***Reconciler同步desired world和actual world,及actual world和物理挂载***//
go vm.reconciler.Run(sourcesReady, stopCh)
<-stopCh
glog.Infof("Shutting down Kubelet Volume Manager")
}

Run()会依次启动desiredStateOfWorldPopulator及reconciler。

总结

volumeManager通过actualStateOfWorld和desiredStateOfWorld来表明当前的volume挂载状态和期望的volume挂载状态。然后由desiredStateOfWorldPopulator维护desireedStateOfWorld和podManager的一致性;由reconcile维护actualStateOfWorld和desiredStateOfWorld的一致性及磁盘volume挂载和actualStateOfWorld的一致性。通过这些机制,volumeManager完成了volume挂载生命周期的管理。