本次分析将介绍kubelet的volumeManager。volumeManager负责维护volume挂载与ETCD中数据的一致性。
actualStateOfWorld actualStateOfWorld代表了目前物理主机上volume的挂载状态,定义在/pkg/kubelet/volumemanager/cache/actual_state_of_world.go中:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type actualStateOfWorld struct {
nodeName types.NodeName
attachedVolumes map [api.UniqueVolumeName]attachedVolume
volumePluginMgr *volume.VolumePluginMgr
sync.RWMutex
}
主要字段含义如下:
nodeName: 物理主机名称;
attachedVolumes: volume名称到attachedVolume的映射,attachedVolume表示已经挂载好的volume;
volumePluginMgr: volume plugin管理者。
再来看下attachedVolume的定义:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
type attachedVolume struct {
volumeName api.UniqueVolumeName
mountedPods map [volumetypes.UniquePodName]mountedPod
spec *volume.Spec
pluginName string
pluginIsAttachable bool
globallyMounted bool
devicePath string
}
attachedVolume中有mountedPods字段,记录了该volume和使用该volume的pod的映射关系。
现在来看actualStateOfWorld实现的主要方法。
actualStateOfWorld::MarkVolumeAsAttached() 1
2
3
4
func (asw *actualStateOfWorld) MarkVolumeAsAttached (
volumeName api.UniqueVolumeName, volumeSpec *volume.Spec, _ types.NodeName, devicePath string ) error {
return asw.addVolume(volumeName, volumeSpec, devicePath)
}
attach的过程在actualStateOfWorld中的表现就是把volume加入到actualStateOfWorld,通过调用addVolume()方法完成。
actualStateOfWorld::MarkVolumeDetached() 1
2
3
4
func (asw *actualStateOfWorld) MarkVolumeAsDetached (
volumeName api.UniqueVolumeName, nodeName types.NodeName) {
asw.DeleteVolume(volumeName)
}
dettach的过程在actualStateOfWorld中的表现就是把volume从actualStateOfWorld中删除,通过调用DeleteVolume()完成。
actualStateOfWorld::MarkVolumeAsMounted() 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (asw *actualStateOfWorld) MarkVolumeAsMounted (
podName volumetypes.UniquePodName,
podUID types.UID,
volumeName api.UniqueVolumeName,
mounter volume.Mounter,
outerVolumeSpecName string ,
volumeGidValue string ) error {
return asw.AddPodToVolume(
podName,
podUID,
volumeName,
mounter,
outerVolumeSpecName,
volumeGidValue)
}
mount的过程在actualStateOfWorld中的表现就是把pod信息记录到actualStateOfWorld的attachedVolume中,通过调用AddPodToVolume()完成。
actualStateOfWorld::MarkVolumeAsUnmounted() 1
2
3
4
5
func (asw *actualStateOfWorld) MarkVolumeAsUnmounted (
podName volumetypes.UniquePodName, volumeName api.UniqueVolumeName) error {
return asw.DeletePodFromVolume(podName, volumeName)
}
unmount的过程在actualStateOfWorld中的表现就是把pod和volume的关系从actualStateOfWorld的attachedVolume中删除,通过DeletePodFromVolume()完成。
actualStateOfWorld::addVolume() addVolume()可以把volume加入到actualStateOfWorld上。volume会生成自己的attachedVolume。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func (asw *actualStateOfWorld) addVolume (
volumeName api.UniqueVolumeName, volumeSpec *volume.Spec, devicePath string ) error {
asw.Lock()
defer asw.Unlock()
volumePlugin, err := asw.volumePluginMgr.FindPluginBySpec(volumeSpec)
if err != nil || volumePlugin == nil {
return fmt.Errorf(
"failed to get Plugin from volumeSpec for volume %q err=%v" ,
volumeSpec.Name(),
err)
}
if len (volumeName) == 0 {
volumeName, err = volumehelper.GetUniqueVolumeNameFromSpec(volumePlugin, volumeSpec)
if err != nil {
return fmt.Errorf(
"failed to GetUniqueVolumeNameFromSpec for volumeSpec %q using volume plugin %q err=%v" ,
volumeSpec.Name(),
volumePlugin.GetPluginName(),
err)
}
}
pluginIsAttachable := false
if _, ok := volumePlugin.(volume.AttachableVolumePlugin); ok {
pluginIsAttachable = true
}
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
volumeObj = attachedVolume{
volumeName: volumeName,
spec: volumeSpec,
mountedPods: make (map [volumetypes.UniquePodName]mountedPod),
pluginName: volumePlugin.GetPluginName(),
pluginIsAttachable: pluginIsAttachable,
globallyMounted: false ,
devicePath: devicePath,
}
} else {
volumeObj.devicePath = devicePath
glog.V(2 ).Infof("Volume %q is already added to attachedVolume list, update device path %q" ,
volumeName,
devicePath)
}
asw.attachedVolumes[volumeName] = volumeObj
return nil
}
actualStateOfWorld::DeleteVolume() DeleteVolume()可以从actualStateOfWorld中删除volume。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (asw *actualStateOfWorld) DeleteVolume (volumeName api.UniqueVolumeName) error {
asw.Lock()
defer asw.Unlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
return nil
}
if len (volumeObj.mountedPods) != 0 {
return fmt.Errorf(
"failed to DeleteVolume %q, it still has %v mountedPods" ,
volumeName,
len (volumeObj.mountedPods))
}
delete (asw.attachedVolumes, volumeName)
return nil
}
actualStateOfWorld::VolumeExists() VolumeExists()可以返回volume在actualStateOfWorld中是否存在。1
2
3
4
5
6
7
8
9
func (asw *actualStateOfWorld) VolumeExists (
volumeName api.UniqueVolumeName) bool {
asw.RLock()
defer asw.RUnlock()
_, volumeExists := asw.attachedVolumes[volumeName]
return volumeExists
}
actualStateOfWorld::AddPodToVolume() AddPodToVolume()把pod的信息加入到volume对应的attachedVolume中。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (asw *actualStateOfWorld) AddPodToVolume (
podName volumetypes.UniquePodName,
podUID types.UID,
volumeName api.UniqueVolumeName,
mounter volume.Mounter,
outerVolumeSpecName string ,
volumeGidValue string ) error {
asw.Lock()
defer asw.Unlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
return fmt.Errorf(
"no volume with the name %q exists in the list of attached volumes" ,
volumeName)
}
podObj, podExists := volumeObj.mountedPods[podName]
if !podExists {
podObj = mountedPod{
podName: podName,
podUID: podUID,
mounter: mounter,
outerVolumeSpecName: outerVolumeSpecName,
volumeGidValue: volumeGidValue,
}
}
podObj.remountRequired = false
asw.attachedVolumes[volumeName].mountedPods[podName] = podObj
return nil
}
actualStateOfWorld::DeletePodFromVolume() DeletePodFromVolume()从actualStateOfWorld中volume对应的attachedVolume中删除pod信息。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (asw *actualStateOfWorld) DeletePodFromVolume (
podName volumetypes.UniquePodName, volumeName api.UniqueVolumeName) error {
asw.Lock()
defer asw.Unlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
return fmt.Errorf(
"no volume with the name %q exists in the list of attached volumes" ,
volumeName)
}
_, podExists := volumeObj.mountedPods[podName]
if podExists {
delete (asw.attachedVolumes[volumeName].mountedPods, podName)
}
return nil
}
actualStateOfWorld::PodExistsInVolume() PodExistsInVolume()可以返回pod是否挂载volume。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (asw *actualStateOfWorld) PodExistsInVolume (
podName volumetypes.UniquePodName,
volumeName api.UniqueVolumeName) (bool , string , error) {
asw.RLock()
defer asw.RUnlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
return false , "" , newVolumeNotAttachedError(volumeName)
}
podObj, podExists := volumeObj.mountedPods[podName]
if podExists && podObj.remountRequired {
return true , volumeObj.devicePath, newRemountRequiredError(volumeObj.volumeName, podObj.podName)
}
return podExists, volumeObj.devicePath, nil
}
desiredStateOfWorld 接着分析desiredStateOfWorld。disiredStateOfWorld表示系统希望得到的挂载状态,定义在/pkg/kubelet/volumemanager/cache/desired_state_of_world.go中:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type desiredStateOfWorld struct {
volumesToMount map [api.UniqueVolumeName]volumeToMount
volumePluginMgr *volume.VolumePluginMgr
sync.RWMutex
}
func NewDesiredStateOfWorld (volumePluginMgr *volume.VolumePluginMgr) DesiredStateOfWorld {
return &desiredStateOfWorld{
volumesToMount: make (map [api.UniqueVolumeName]volumeToMount),
volumePluginMgr: volumePluginMgr,
}
}
可以看到desiredStateOfWorld中有个volumesToMount的字段记录了volume名称和volumeToMount的映射。volumeToMount的定义如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type volumeToMount struct {
volumeName api.UniqueVolumeName
podsToMount map [types.UniquePodName]podToMount
pluginIsAttachable bool
volumeGidValue string
reportedInUse bool
}
来看下desiredStateOfWorld实现的方法.
desiredStateOfWorld::AddPodToVolume() AddPodToVolume()把pod信息加入到desiredStateOfWorld的volumeToMount中。其中的参数volumeSpec是最终的volume的信息,如PVC时,会转换到PV的信息,其他则各pod中定义的volume信息相同。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
func (dsw *desiredStateOfWorld) AddPodToVolume (
podName types.UniquePodName,
pod *api.Pod,
volumeSpec *volume.Spec,
outerVolumeSpecName string ,
volumeGidValue string ) (api.UniqueVolumeName, error) {
dsw.Lock()
defer dsw.Unlock()
volumePlugin, err := dsw.volumePluginMgr.FindPluginBySpec(volumeSpec)
if err != nil || volumePlugin == nil {
return "" , fmt.Errorf(
"failed to get Plugin from volumeSpec for volume %q err=%v" ,
volumeSpec.Name(),
err)
}
var volumeName api.UniqueVolumeName
attachable := dsw.isAttachableVolume(volumeSpec)
if attachable {
volumeName, err =
volumehelper.GetUniqueVolumeNameFromSpec(volumePlugin, volumeSpec)
if err != nil {
return "" , fmt.Errorf(
"failed to GetUniqueVolumeNameFromSpec for volumeSpec %q using volume plugin %q err=%v" ,
volumeSpec.Name(),
volumePlugin.GetPluginName(),
err)
}
} else {
volumeName = volumehelper.GetUniqueVolumeNameForNonAttachableVolume(podName, volumePlugin, volumeSpec)
}
volumeObj, volumeExists := dsw.volumesToMount[volumeName]
if !volumeExists {
volumeObj = volumeToMount{
volumeName: volumeName,
podsToMount: make (map [types.UniquePodName]podToMount),
pluginIsAttachable: attachable,
volumeGidValue: volumeGidValue,
reportedInUse: false ,
}
dsw.volumesToMount[volumeName] = volumeObj
}
dsw.volumesToMount[volumeName].podsToMount[podName] = podToMount{
podName: podName,
pod: pod,
spec: volumeSpec,
outerVolumeSpecName: outerVolumeSpecName,
}
return volumeName, nil
}
desiredStateOfWorld::DeletePodFromVolume() DeletePodFromVolume()把pod从指定volumeName对应的volumeToMount中删除。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (dsw *desiredStateOfWorld) DeletePodFromVolume (
podName types.UniquePodName, volumeName api.UniqueVolumeName) {
dsw.Lock()
defer dsw.Unlock()
volumeObj, volumeExists := dsw.volumesToMount[volumeName]
if !volumeExists {
return
}
if _, podExists := volumeObj.podsToMount[podName]; !podExists {
return
}
delete (dsw.volumesToMount[volumeName].podsToMount, podName)
if len (dsw.volumesToMount[volumeName].podsToMount) == 0 {
delete (dsw.volumesToMount, volumeName)
}
}
desiredStateOfWorld::VolumeExists() VolumeExists()可以返回volume就否在desiredStateOfWorld中。1
2
3
4
5
6
7
8
9
func (dsw *desiredStateOfWorld) VolumeExists (
volumeName api.UniqueVolumeName) bool {
dsw.RLock()
defer dsw.RUnlock()
_, volumeExists := dsw.volumesToMount[volumeName]
return volumeExists
}
desiredStateOfWorld::PodExistsInVolume() PodExistsInVolume()可以返回pod是否需要挂载某volume。1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (dsw *desiredStateOfWorld) PodExistsInVolume (
podName types.UniquePodName, volumeName api.UniqueVolumeName) bool {
dsw.RLock()
defer dsw.RUnlock()
volumeObj, volumeExists := dsw.volumesToMount[volumeName]
if !volumeExists {
return false
}
_, podExists := volumeObj.podsToMount[podName]
return podExists
}
desiredStateOfWorldPopulator 有了desiredStateOfWorld后,我们需要一个能维护desiredStateOfWorld中内容为最新内容的机制,这个机制就是desiredStateOfWorldPopulator。desiredStateOfWorldPopulator定义在/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go中:1
2
3
4
5
6
7
8
9
10
type desiredStateOfWorldPopulator struct {
kubeClient internalclientset.Interface
loopSleepDuration time.Duration
getPodStatusRetryDuration time.Duration
podManager pod.Manager
desiredStateOfWorld cache.DesiredStateOfWorld
pods processedPods
kubeContainerRuntime kubecontainer.Runtime
timeOfLastGetPodStatus time.Time
}
可以看到,desiredStateOfWorldPopulator中有desiredStateOfWorld。 desiredStateOfWorldPopulator的启动函数为Run(),定义如下:1
2
3
4
5
func (dswp *desiredStateOfWorldPopulator) Run (stopCh <-chan struct {}) {
wait.Until(dswp.populatorLoopFunc(), dswp.loopSleepDuration, stopCh)
}
可以看出,Run()主要周期地执行populatorLoopFunc():1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (dswp *desiredStateOfWorldPopulator) populatorLoopFunc () func () {
return func () {
dswp.findAndAddNewPods()
if time.Since(dswp.timeOfLastGetPodStatus) < dswp.getPodStatusRetryDuration {
glog.V(5 ).Infof(
"Skipping findAndRemoveDeletedPods(). Not permitted until %v (getPodStatusRetryDuration %v)." ,
dswp.timeOfLastGetPodStatus.Add(dswp.getPodStatusRetryDuration),
dswp.getPodStatusRetryDuration)
return
}
dswp.findAndRemoveDeletedPods()
}
}
populatorLoopFunc()的功能由两个:
往desiredStateOfWorld中添加新的pod,由findAndAddNewPods()完成;
从desiredStateOfWorld删除已经删除的pod,由findAndRemoveDeletedPods()完成。
所以从这只可以看出,我们不能改变一个pod的挂载。
findAndAddNewPods()定义如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods () {
for _, pod := range dswp.podManager.GetPods() {
if isPodTerminated(pod) {
continue
}
dswp.processPodVolumes(pod)
}
}
func (dswp *desiredStateOfWorldPopulator) processPodVolumes (pod *api.Pod) {
if pod == nil {
return
}
uniquePodName := volumehelper.GetUniquePodName(pod)
if dswp.podPreviouslyProcessed(uniquePodName) {
return
}
for _, podVolume := range pod.Spec.Volumes {
volumeSpec, volumeGidValue, err :=
dswp.createVolumeSpec(podVolume, pod.Namespace)
if err != nil {
glog.Errorf(
"Error processing volume %q for pod %q: %v" ,
podVolume.Name,
format.Pod(pod),
err)
continue
}
_, err = dswp.desiredStateOfWorld.AddPodToVolume(
uniquePodName, pod, volumeSpec, podVolume.Name, volumeGidValue)
if err != nil {
glog.Errorf(
"Failed to add volume %q (specName: %q) for pod %q to desiredStateOfWorld. err=%v" ,
podVolume.Name,
volumeSpec.Name(),
uniquePodName,
err)
}
glog.V(10 ).Infof(
"Added volume %q (volSpec=%q) for pod %q to desired state." ,
podVolume.Name,
volumeSpec.Name(),
uniquePodName)
}
dswp.markPodProcessed(uniquePodName)
}
findAndAddNewPods()从podManager处获取pods,然后调用processPodVolumes()处理每个pod。 processPodVolumes()对于会调用desiredStateOfWorld的AddPodToVolume()把pod加入到desiredStateOfWorld中。desiredStateOfWorldPopulator会对处理过的pod进行记录,确保每个pod只被处理一次。
再来看findAndRemoveDeletedPods():1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods () {
var runningPods []*kubecontainer.Pod
runningPodsFetched := false
for _, volumeToMount := range dswp.desiredStateOfWorld.GetVolumesToMount() {
pod, podExists := dswp.podManager.GetPodByUID(volumeToMount.Pod.UID)
if podExists {
if !isPodTerminated(pod) {
continue
}
volume := volumeToMount.VolumeSpec.Volume
if volume == nil {
continue
}
if (volume.EmptyDir == nil || volume.EmptyDir.Medium != api.StorageMediumMemory) &&
volume.ConfigMap == nil && volume.Secret == nil {
continue
}
}
if !runningPodsFetched {
var getPodsErr error
runningPods, getPodsErr = dswp.kubeContainerRuntime.GetPods(false )
if getPodsErr != nil {
glog.Errorf(
"kubeContainerRuntime.findAndRemoveDeletedPods returned error %v." ,
getPodsErr)
continue
}
runningPodsFetched = true
dswp.timeOfLastGetPodStatus = time.Now()
}
runningContainers := false
for _, runningPod := range runningPods {
if runningPod.ID == volumeToMount.Pod.UID {
if len (runningPod.Containers) > 0 {
runningContainers = true
}
break
}
}
if runningContainers {
glog.V(5 ).Infof(
"Pod %q has been removed from pod manager. However, it still has one or more containers in the non-exited state. Therefore, it will not be removed from volume manager." ,
format.Pod(volumeToMount.Pod))
continue
}
glog.V(5 ).Infof(
"Removing volume %q (volSpec=%q) for pod %q from desired state." ,
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
format.Pod(volumeToMount.Pod))
dswp.desiredStateOfWorld.DeletePodFromVolume(
volumeToMount.PodName, volumeToMount.VolumeName)
dswp.deleteProcessedPod(volumeToMount.PodName)
}
}
findAndRemoveDeletedPods()找到desiredStateOfWorld中有,但podManager中没有的pod,然后把该pod从DesiredStateOfWorld中删除,并把pod从记录表中删除。
所以,现在我们已经完成了podManager和desiredStateOfWorld的同步。
reconciler 现在还差actualStateOfWorld和desiredStateOfWorld的一致性维护和物理机上挂载和actualStateOfWorld一致性的维护。这些都由reconciler完成。 reconciler定义在/pkg/kubelet/volumemanager/reconciler/reconciler.go中:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type reconciler struct {
kubeClient internalclientset.Interface
controllerAttachDetachEnabled bool
loopSleepDuration time.Duration
syncDuration time.Duration
waitForAttachTimeout time.Duration
nodeName types.NodeName
desiredStateOfWorld cache.DesiredStateOfWorld
actualStateOfWorld cache.ActualStateOfWorld
operationExecutor operationexecutor.OperationExecutor
mounter mount.Interface
volumePluginMgr *volumepkg.VolumePluginMgr
kubeletPodsDir string
timeOfLastSync time.Time
}
reconciler有desiredStateOfWorld和actualStateOfWorld,及operationExecutor用来执行具体的挂载操作。 reconciler启动方法如下:1
2
3
func (rc *reconciler) Run (sourcesReady config.SourcesReady, stopCh <-chan struct {}) {
wait.Until(rc.reconciliationLoopFunc(sourcesReady), rc.loopSleepDuration, stopCh)
}
reconciliationLoopFunc()定义如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (rc *reconciler) reconciliationLoopFunc (sourcesReady config.SourcesReady) func () {
return func () {
rc.reconcile()
if sourcesReady.AllReady() && time.Since(rc.timeOfLastSync) > rc.syncDuration {
glog.V(5 ).Infof("Sources are all ready, starting reconstruct state function" )
rc.sync()
}
}
}
reconciliationLoopFunc()由两个功能:
处理actualStateOfWorld和desiredStateOfWorld的一致性,由reconcile()完成;
处理磁盘volume和actual state of world,desired state of world的一致性,由sync()完成。
先来看reconcile(),定义如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
func (rc *reconciler) reconcile () {
for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() {
if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
glog.V(12 ).Infof("Attempting to start UnmountVolume for volume %q (spec.Name: %q) from pod %q (UID: %q)." ,
mountedVolume.VolumeName,
mountedVolume.OuterVolumeSpecName,
mountedVolume.PodName,
mountedVolume.PodUID)
err := rc.operationExecutor.UnmountVolume(
mountedVolume.MountedVolume, rc.actualStateOfWorld)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
glog.Errorf(
"operationExecutor.UnmountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v" ,
mountedVolume.VolumeName,
mountedVolume.OuterVolumeSpecName,
mountedVolume.PodName,
mountedVolume.PodUID,
rc.controllerAttachDetachEnabled,
err)
}
if err == nil {
glog.Infof("UnmountVolume operation started for volume %q (spec.Name: %q) from pod %q (UID: %q)." ,
mountedVolume.VolumeName,
mountedVolume.OuterVolumeSpecName,
mountedVolume.PodName,
mountedVolume.PodUID)
}
}
}
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
volumeToMount.DevicePath = devicePath
if cache.IsVolumeNotAttachedError(err) {
if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
glog.V(12 ).Infof("Attempting to start VerifyControllerAttachedVolume for volume %q (spec.Name: %q) pod %q (UID: %q)" ,
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID)
err := rc.operationExecutor.VerifyControllerAttachedVolume(
volumeToMount.VolumeToMount,
rc.nodeName,
rc.actualStateOfWorld)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
glog.Errorf(
"operationExecutor.VerifyControllerAttachedVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v" ,
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID,
rc.controllerAttachDetachEnabled,
err)
}
if err == nil {
glog.Infof("VerifyControllerAttachedVolume operation started for volume %q (spec.Name: %q) pod %q (UID: %q)" ,
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID)
}
} else {
volumeToAttach := operationexecutor.VolumeToAttach{
VolumeName: volumeToMount.VolumeName,
VolumeSpec: volumeToMount.VolumeSpec,
NodeName: rc.nodeName,
}
glog.V(12 ).Infof("Attempting to start AttachVolume for volume %q (spec.Name: %q) pod %q (UID: %q)" ,
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID)
err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
glog.Errorf(
"operationExecutor.AttachVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v" ,
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID,
rc.controllerAttachDetachEnabled,
err)
}
if err == nil {
glog.Infof("AttachVolume operation started for volume %q (spec.Name: %q) pod %q (UID: %q)" ,
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID)
}
}
} else if !volMounted || cache.IsRemountRequiredError(err) {
remountingLogStr := ""
if cache.IsRemountRequiredError(err) {
remountingLogStr = "Volume is already mounted to pod, but remount was requested."
}
glog.V(12 ).Infof("Attempting to start MountVolume for volume %q (spec.Name: %q) to pod %q (UID: %q). %s" ,
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID,
remountingLogStr)
err := rc.operationExecutor.MountVolume(
rc.waitForAttachTimeout,
volumeToMount.VolumeToMount,
rc.actualStateOfWorld)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
glog.Errorf(
"operationExecutor.MountVolume failed for volume %q (spec.Name: %q) pod %q (UID: %q) controllerAttachDetachEnabled: %v with err: %v" ,
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID,
rc.controllerAttachDetachEnabled,
err)
}
if err == nil {
logMsg := fmt.Sprintf("MountVolume operation started for volume %q (spec.Name: %q) to pod %q (UID: %q). %s" ,
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID,
remountingLogStr)
if remountingLogStr == "" {
glog.V(1 ).Infof(logMsg)
} else {
glog.V(5 ).Infof(logMsg)
}
}
}
}
for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) {
if attachedVolume.GloballyMounted {
glog.V(12 ).Infof("Attempting to start UnmountDevice for volume %q (spec.Name: %q)" ,
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name())
err := rc.operationExecutor.UnmountDevice(
attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
glog.Errorf(
"operationExecutor.UnmountDevice failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v" ,
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name(),
rc.controllerAttachDetachEnabled,
err)
}
if err == nil {
glog.Infof("UnmountDevice operation started for volume %q (spec.Name: %q)" ,
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name())
}
} else {
if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable {
rc.actualStateOfWorld.MarkVolumeAsDetached(
attachedVolume.VolumeName, rc.nodeName)
} else {
glog.V(12 ).Infof("Attempting to start DetachVolume for volume %q (spec.Name: %q)" ,
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name())
err := rc.operationExecutor.DetachVolume(
attachedVolume.AttachedVolume, false , rc.actualStateOfWorld)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
glog.Errorf(
"operationExecutor.DetachVolume failed for volume %q (spec.Name: %q) controllerAttachDetachEnabled: %v with err: %v" ,
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name(),
rc.controllerAttachDetachEnabled,
err)
}
if err == nil {
glog.Infof("DetachVolume operation started for volume %q (spec.Name: %q)" ,
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name())
}
}
}
}
}
}
reconcile()主要处理以下三种情况:
actualStateOfWorld中有,但desiredStateOfWorld中没有,则调用operationExecutor.UnmountVolume()执行unmount操作;
desiredStateOfWorld中有,但actualStateOfWorld中未挂载,则执行attach操作或mount操作;
对actualStateOfWorld中的unmounted volume进行检查,如果desiredStateOfWorld中没有,则从actualStateOfWorld中删除volume。
再来看sync():1
2
3
4
func (rc *reconciler) sync () {
defer rc.updateLastSyncTime()
rc.syncStates(rc.kubeletPodsDir)
}
sync()主要调用了syncStates():1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func (rc *reconciler) syncStates (podsDir string ) {
podVolumes, err := getVolumesFromPodDir(podsDir)
if err != nil {
glog.Errorf("Cannot get volumes from disk %v" , err)
return
}
volumesNeedUpdate := make (map [api.UniqueVolumeName]*reconstructedVolume)
for _, volume := range podVolumes {
reconstructedVolume, err := rc.reconstructVolume(volume)
if err != nil {
glog.Errorf("Could not construct volume information: %v" , err)
continue
}
pending := rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, reconstructedVolume.podName)
dswExist := rc.desiredStateOfWorld.PodExistsInVolume(reconstructedVolume.podName, reconstructedVolume.volumeName)
aswExist, _, _ := rc.actualStateOfWorld.PodExistsInVolume(reconstructedVolume.podName, reconstructedVolume.volumeName)
if !rc.StatesHasBeenSynced() {
if aswExist || !reconstructedVolume.pluginIsAttachable {
continue
}
} else {
if pending || dswExist || aswExist {
continue
}
}
glog.V(2 ).Infof(
"Reconciler sync states: could not find pod information in desired or actual states or pending operation, update it in both states: %+v" ,
reconstructedVolume)
volumesNeedUpdate[reconstructedVolume.volumeName] = reconstructedVolume
}
if len (volumesNeedUpdate) > 0 {
if err = rc.updateStates(volumesNeedUpdate); err != nil {
glog.Errorf("Error occurred during reconstruct volume from disk: %v" , err)
}
}
}
syncStates()会从磁盘上获取volume信息,然后排除状态为pending,或desiredStateOfWorld中存在,或actualStateOfWorld中存在的volume挂载,然后通过updateStates()方法把缺失的挂载加到desiredStateOfWorld和actualStateOfWorld中,以后的事情就交给populator及reconciler来处理了,如果该volume挂载是多余的,也会被删除的。
volumeManager 最后来看volumeManager,管理物理节点上的volume挂载。volumeManager定义在/pkg/kubelet/volumemanager/volume_manager.go中:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
type volumeManager struct {
kubeClient internalclientset.Interface
volumePluginMgr *volume.VolumePluginMgr
desiredStateOfWorld cache.DesiredStateOfWorld
actualStateOfWorld cache.ActualStateOfWorld
operationExecutor operationexecutor.OperationExecutor
reconciler reconciler.Reconciler
desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator
}
volumeManager中有desiredStateOfWorld,actualStateOfWorld,operationExecutor,reconciler及desiredStateOfWorldPopulator。
volumeManager的生成方法如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func NewVolumeManager (
controllerAttachDetachEnabled bool ,
nodeName k8stypes.NodeName,
podManager pod.Manager,
kubeClient internalclientset.Interface,
volumePluginMgr *volume.VolumePluginMgr,
kubeContainerRuntime kubecontainer.Runtime,
mounter mount.Interface,
kubeletPodsDir string ,
recorder record.EventRecorder,
checkNodeCapabilitiesBeforeMount bool ) (VolumeManager, error) {
vm := &volumeManager{
kubeClient: kubeClient,
volumePluginMgr: volumePluginMgr,
desiredStateOfWorld: cache.NewDesiredStateOfWorld(volumePluginMgr),
actualStateOfWorld: cache.NewActualStateOfWorld(nodeName, volumePluginMgr),
operationExecutor: operationexecutor.NewOperationExecutor(
kubeClient,
volumePluginMgr,
recorder,
checkNodeCapabilitiesBeforeMount),
}
vm.reconciler = reconciler.NewReconciler(
kubeClient,
controllerAttachDetachEnabled,
reconcilerLoopSleepPeriod,
reconcilerSyncStatesSleepPeriod,
waitForAttachTimeout,
nodeName,
vm.desiredStateOfWorld,
vm.actualStateOfWorld,
vm.operationExecutor,
mounter,
volumePluginMgr,
kubeletPodsDir)
vm.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
kubeClient,
desiredStateOfWorldPopulatorLoopSleepPeriod,
desiredStateOfWorldPopulatorGetPodStatusRetryDuration,
podManager,
vm.desiredStateOfWorld,
kubeContainerRuntime)
return vm, nil
}
volumeManager启动方法如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (vm *volumeManager) Run (sourcesReady config.SourcesReady, stopCh <-chan struct {}) {
defer runtime.HandleCrash()
go vm.desiredStateOfWorldPopulator.Run(stopCh)
glog.V(2 ).Infof("The desired_state_of_world populator starts" )
glog.Infof("Starting Kubelet Volume Manager" )
go vm.reconciler.Run(sourcesReady, stopCh)
<-stopCh
glog.Infof("Shutting down Kubelet Volume Manager" )
}
Run()会依次启动desiredStateOfWorldPopulator及reconciler。
总结 volumeManager通过actualStateOfWorld和desiredStateOfWorld来表明当前的volume挂载状态和期望的volume挂载状态。然后由desiredStateOfWorldPopulator维护desireedStateOfWorld和podManager的一致性;由reconcile维护actualStateOfWorld和desiredStateOfWorld的一致性及磁盘volume挂载和actualStateOfWorld的一致性。通过这些机制,volumeManager完成了volume挂载生命周期的管理。