kubelet在Kubernetes中负责具体与物理设备打交道,负责pod所对应的相关资源的管理,如container, volume, network等。本次分析将介绍kubelet的config,看kubelet是如何把pod的信息从apiserver,或file,或http同步到kublet中的。其中,apiserver渠道代表从ETCD中获取pod的变化;file渠道代表从文件获取源的变化,如static pod;http渠道我还没用过。

Mux

关于Mux,在”kube-proxy分析(一)-config-v1.5.2”中已介绍过。现直接拷贝过来。
Mux可以创建多条channel,并与相关config的storage配合把这些channel中的数据进行合并。Mux定义在/pkg/util/config/config.go中:

1
2
3
4
5
6
7
8
9
10
11
// Mux is a class for merging configuration from multiple sources. Changes are
// pushed via channels and sent to the merge function.
type Mux struct {
// Invoked when an update is sent to a source.
merger Merger
// Sources and their lock.
sourceLock sync.RWMutex
// Maps source names to channels
sources map[string]chan interface{}
}

其中,Merger负责把数据进行合并;sources中记录了名称及对应的channel。
可以使用NewMux()函数生成Mux:

1
2
3
4
5
6
7
8
// NewMux creates a new mux that can merge changes from multiple sources.
func NewMux(merger Merger) *Mux {
mux := &Mux{
sources: make(map[string]chan interface{}),
merger: merger,
}
return mux
}

在创建Mux时需要指定Merger,而sources中的channel目前为空。那么,如何建立source channel,又如何把source channel中的内容交Merger处理呢?来看Mux的Channel()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (m *Mux) Channel(source string) chan interface{} {
if len(source) == 0 {
panic("Channel given an empty name")
}
m.sourceLock.Lock()
defer m.sourceLock.Unlock()
channel, exists := m.sources[source]
if exists {
return channel
}
newChannel := make(chan interface{})
m.sources[source] = newChannel
go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
return newChannel
}
func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
for update := range listenChannel {
m.merger.Merge(source, update)
}
}

Channel()方法会创建一个channel,并启动一个routine调用listen(),最后把channel返回。listen()就是监听刚创建的channel,然后通过merger.Merge()处理数据。所以,通过调用Mux的Channel()方法,我们可以获取到一个channel,我们只要往这个channel中写数据即可,Mux自动会把数据交给merger处理。

再来看下什么是Merger,Merger概念定义在/pkg/util/config/config.go中:

1
2
3
4
5
6
type Merger interface {
// Invoked when a change from a source is received. May also function as an incremental
// merger if you wish to consume changes incrementally. Must be reentrant when more than
// one source is defined.
Merge(source string, update interface{}) error
}

可以看出,只要实现了Merge()方法的结构体都可以称为Merger,很简洁。

podStorage

podStorage是对多个渠道过来的数据的一个汇总的存储场所,即可以理解为是一个Merger,定义在/pkg/kubelet/config/config.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type podStorage struct {
podLock sync.RWMutex
// map of source name to pod name to pod reference
pods map[string]map[string]*api.Pod
mode PodConfigNotificationMode
// ensures that updates are delivered in strict order
// on the updates channel
updateLock sync.Mutex
updates chan<- kubetypes.PodUpdate
// contains the set of all sources that have sent at least one SET
sourcesSeenLock sync.RWMutex
sourcesSeen sets.String
// the EventRecorder to use
recorder record.EventRecorder
}

可以看到,podStorage有一个updates,作为所有的整理好的数据集中地;还有pods字段用来存储分类好的pods。所以podStorage的主要工作流程为:消费Mux返回的channel中的数据,调用Merge()对数据进行处理,最后把整理好的数据发送到updates中。

先来看podStorage的Merge():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (s *podStorage) Merge(source string, change interface{}) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
seenBefore := s.sourcesSeen.Has(source)
//***调用merge(),获取adds, updates, deletes***//
adds, updates, deletes, removes, reconciles := s.merge(source, change)
firstSet := !seenBefore && s.sourcesSeen.Has(source)
// deliver update notifications
switch s.mode {
case PodConfigNotificationIncremental:
if len(removes.Pods) > 0 {
s.updates <- *removes
}
if len(adds.Pods) > 0 {
s.updates <- *adds
}
if len(updates.Pods) > 0 {
s.updates <- *updates
}
if len(deletes.Pods) > 0 {
s.updates <- *deletes
}
if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
// Send an empty update when first seeing the source and there are
// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
// the source is ready.
s.updates <- *adds
}
// Only add reconcile support here, because kubelet doesn't support Snapshot update now.
if len(reconciles.Pods) > 0 {
s.updates <- *reconciles
}
......
default:
panic(fmt.Sprintf("unsupported PodConfigNotificationMode: %#v", s.mode))
}
return nil
}

一般来说,podStorage的mode为PodConfigNotificationIncremental。可以看出,Merge()调用merge()把指定source的数据进行整理,把数据分类到adds, updates, deletes, removes, reconciles等channel中。

merge()定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
s.podLock.Lock()
defer s.podLock.Unlock()
//***定义pods列表***//
addPods := []*api.Pod{}
updatePods := []*api.Pod{}
deletePods := []*api.Pod{}
removePods := []*api.Pod{}
reconcilePods := []*api.Pod{}
pods := s.pods[source]
if pods == nil {
pods = make(map[string]*api.Pod)
}
// updatePodFunc is the local function which updates the pod cache *oldPods* with new pods *newPods*.
// After updated, new pod will be stored in the pod cache *pods*.
// Notice that *pods* and *oldPods* could be the same cache.
updatePodsFunc := func(newPods []*api.Pod, oldPods, pods map[string]*api.Pod) {
//***Fankang***//
//***过滤出有效的pod***//
filtered := filterInvalidPods(newPods, source, s.recorder)
for _, ref := range filtered {
name := kubecontainer.GetPodFullName(ref)
// Annotate the pod with the source before any comparison.
//***更新pod的Annotations***//
//***?但在pod Annotations中没有***//
if ref.Annotations == nil {
ref.Annotations = make(map[string]string)
}
ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
if existing, found := oldPods[name]; found {
pods[name] = existing
needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
if needUpdate {
//***加入到updatePods中***//
updatePods = append(updatePods, existing)
} else if needReconcile {
//***加入到reconcilePods***//
reconcilePods = append(reconcilePods, existing)
} else if needGracefulDelete {
//***Fankang***//
//***加入到deletePods***//
deletePods = append(deletePods, existing)
}
continue
}
recordFirstSeenTime(ref)
pods[name] = ref
//***Fankang***//
//***加入到addPods***//
addPods = append(addPods, ref)
}
}
update := change.(kubetypes.PodUpdate)
switch update.Op {
//***处理ADD, UPDATE, DELETE***//
case kubetypes.ADD, kubetypes.UPDATE, kubetypes.DELETE:
if update.Op == kubetypes.ADD {
glog.V(4).Infof("Adding new pods from source %s : %v", source, update.Pods)
} else if update.Op == kubetypes.DELETE {
glog.V(4).Infof("Graceful deleting pods from source %s : %v", source, update.Pods)
} else {
glog.V(4).Infof("Updating pods from source %s : %v", source, update.Pods)
}
updatePodsFunc(update.Pods, pods, pods)
case kubetypes.REMOVE:
glog.V(4).Infof("Removing pods from source %s : %v", source, update.Pods)
for _, value := range update.Pods {
name := kubecontainer.GetPodFullName(value)
if existing, found := pods[name]; found {
// this is a delete
delete(pods, name)
removePods = append(removePods, existing)
continue
}
// this is a no-op
}
//***只走此通道***//
case kubetypes.SET:
glog.V(4).Infof("Setting pods for source %s", source)
s.markSourceSet(source)
// Clear the old map entries by just creating a new map
oldPods := pods
pods = make(map[string]*api.Pod)
updatePodsFunc(update.Pods, oldPods, pods)
//***如果以前有,但新的没有,则需要删除***//
for name, existing := range oldPods {
if _, found := pods[name]; !found {
// this is a delete
removePods = append(removePods, existing)
}
}
default:
glog.Warningf("Received invalid update type: %v", update)
}
s.pods[source] = pods
//***设置Op***//
adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}
return adds, updates, deletes, removes, reconciles
}
// checkAndUpdatePod updates existing, and:
// * if ref makes a meaningful change, returns needUpdate=true
// * if ref makes a meaningful change, and this change is graceful deletion, returns needGracefulDelete=true
// * if ref makes no meaningful change, but changes the pod status, returns needReconcile=true
// * else return all false
// Now, needUpdate, needGracefulDelete and needReconcile should never be both true
func checkAndUpdatePod(existing, ref *api.Pod) (needUpdate, needReconcile, needGracefulDelete bool) {
// 1. this is a reconcile
// TODO: it would be better to update the whole object and only preserve certain things
// like the source annotation or the UID (to ensure safety)
if !podsDifferSemantically(existing, ref) {
// this is not an update
// Only check reconcile when it is not an update, because if the pod is going to
// be updated, an extra reconcile is unnecessary
//***比较existing和ref的Status,如果不一致,则为Reconcile***//
//***Reconcile只更新status***//
if !reflect.DeepEqual(existing.Status, ref.Status) {
// Pod with changed pod status needs reconcile, because kubelet should
// be the source of truth of pod status.
existing.Status = ref.Status
needReconcile = true
}
return
}
// Overwrite the first-seen time with the existing one. This is our own
// internal annotation, there is no need to update.
ref.Annotations[kubetypes.ConfigFirstSeenAnnotationKey] = existing.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]
existing.Spec = ref.Spec
existing.Labels = ref.Labels
existing.DeletionTimestamp = ref.DeletionTimestamp
existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds
existing.Status = ref.Status
updateAnnotations(existing, ref)
// 2. this is an graceful delete
//***如果DelettionTimestamp不为空,则为Graceful Delete***//
if ref.DeletionTimestamp != nil {
needGracefulDelete = true
} else {
// 3. this is an update
//***如果不为删除,则为Update***//
needUpdate = true
}
return
}

merge()消费的渠道中的数据的update.Op一般为kubetypes.SET,由updatePodsFunc完成分类。
根据merge()和checkAndUpdatePod(),我们可以小结下各channel的逻辑:

  1. adds:表示kubelet需要新增的pods,其内的数据动作为kubetypes.ADD;
  2. updates:表示kubelet需要更新的pods,其内的数据动作为kubetypes.UPDATE;
  3. deletes:表示kubelet需要”删除”的pods,这些pods的 DeletionTimestamp被标记,其内的数据动作为kubetypes.DELETE;
  4. removes:表示kubelet需要删除的pods,这些pods在kubelet侧有,但对应的渠道侧没有,其内的数据动作为kubetypes.REMOVE;
  5. reconciles: 表示kubelet需要更新status的pods,其内的数据动作为kubetypes.RECONCILE。

这里要注意下,每次update中都存储有全量的pods。updatePodsFunc会对全量的pods进行归类,然后在类别上加入OP标签以标识这些pods需要的操作。

PodConfig

PodConfig是负责管理整个pod同步,定义在/pkg/kubelet/config/config.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// PodConfig is a configuration mux that merges many sources of pod configuration into a single
// consistent structure, and then delivers incremental change notifications to listeners
// in order.
type PodConfig struct {
pods *podStorage
mux *config.Mux
// the channel of denormalized changes passed to listeners
updates chan kubetypes.PodUpdate
// contains the list of all configured sources
sourcesLock sync.Mutex
sources sets.String
}

PodConfig主要有如下字段:

  1. pods: 是一个podStorage,用来汇总数据;
  2. mux: 为Mux;
  3. updates:即pods podStorage中的updates字段,所有处理过的事件都会汇总到这供外部的消费者消费;
  4. sources:记录config中的渠道名称。

PodConfig实现有Channel()方法:

1
2
3
4
5
6
7
8
9
//***生成channel,只要往该channel中放入事件,则mux会自动把事件转交给Merge()***//
func (c *PodConfig) Channel(source string) chan<- interface{} {
c.sourcesLock.Lock()
defer c.sourcesLock.Unlock()
c.sources.Insert(source)
//***调用mux的Channel()函数***//
//***mux的Channel()会消费生成的newChannel中的对象,并最后调用Merge()***//
return c.mux.Channel(source)
}

Channel()方法会调用mux的Channel(),启动整个merge流程。

那么,外部如何获取存储有处理好的汇总数据集中地updates channel呢?所以,PodConfig还实现了Updates():

1
2
3
4
// Updates returns a channel of updates to the configuration, properly denormalized.
func (c *PodConfig) Updates() <-chan kubetypes.PodUpdate {
return c.updates
}

Updates()很简单,直接返回updates channel。

apiserver渠道

以apiserver渠道渠道为例来看下渠道,即source的实现,定义在/pkg/kubelet/config/apiserver.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c *clientset.Clientset, nodeName types.NodeName, updates chan<- interface{}) {
//***创建apiserver的listwatch***//
lw := cache.NewListWatchFromClient(c.Core().RESTClient(), "pods", api.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
newSourceApiserverFromLW(lw, updates)
}
// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
//***undeltaStore中的push func***//
send := func(objs []interface{}) {
var pods []*api.Pod
for _, o := range objs {
pods = append(pods, o.(*api.Pod))
}
//***Op为SET ***//
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
}
//***把lw中的对象放到undeltaStore中***//
cache.NewReflector(lw, &api.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0).Run()
}

NewSourceApiserver()会通过listwatch机制把变化发送到参数中的channel,即mux生成的channel,也是merger需要处理数据所在的channel。

调用

/pkg/kubelet/kubelet.go的makePodSourceConfig()中会对pod source config进行设置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//***podsourceconfig是用来处理kubelet信息来源的***//
func makePodSourceConfig(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, nodeName types.NodeName) (*config.PodConfig, error) {
manifestURLHeader := make(http.Header)
if kubeCfg.ManifestURLHeader != "" {
pieces := strings.Split(kubeCfg.ManifestURLHeader, ":")
if len(pieces) != 2 {
return nil, fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", kubeCfg.ManifestURLHeader)
}
manifestURLHeader.Set(pieces[0], pieces[1])
}
// source of all configuration
//***NewPodConfig()会创建一个channel,该channel的生产者是Merge(),消费者是kubelet***//
//***config的Channel()也会创建一个channel,该channel的生产者是来源LW,消费者是Merge()***//
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
// define file config source
if kubeCfg.PodManifestPath != "" {
glog.Infof("Adding manifest file: %v", kubeCfg.PodManifestPath)
config.NewSourceFile(kubeCfg.PodManifestPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
}
// define url config source
if kubeCfg.ManifestURL != "" {
glog.Infof("Adding manifest url %q with HTTP header %v", kubeCfg.ManifestURL, manifestURLHeader)
config.NewSourceURL(kubeCfg.ManifestURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
}
if kubeDeps.KubeClient != nil {
glog.Infof("Watching apiserver")
glog.Infof("Watching apiserver")
//***Channel()返回一个新的channel***//
//***NewSourceApiserver()调用newSourceApiserverFromLW()把内容放入到新的channel中***//
//***ApiserverSource = "api",定义在types.go中***//
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, cfg.Channel(kubetypes.ApiserverSource))
}
return cfg, nil
}

可见,makePodSourceConfig()会生成一个新的config,然后调用NewSourceFile(), NewSourceURL(), NewSourceApiserver()生成三个渠道。三个渠道的数据都会放入属于自己的channel中,而该channel是通过PodConfig的Channel()生成的。现在整个PodConfig就能汇总加工3个渠道的数据了。外部只要调用PocConfig的Updates()即可获取这些处理好的数据所有的channel,相当方便。