resourceVersion

我们先来看下Kubernetes的版本策略。通过Apiserver获取到的所有对象中,都有一个”resourceVersion”的字段。如:

1
2
3
4
5
6
apiVersion: v1
kind: Pod
metadata:
resourceVersion: "879232"
selfLink: /api/v1/namespaces/default/pods/nginx-1zr5x
uid: 9910eaf7-f0f3-11e7-a0b3-0800274a4ec3

该Pod的resourceVersion为879232,更新该Pod时,Kubernetes会比较该resourceVersion和ETCD中对象的resourceVersion,在一致的情况下都会更新,一旦发生更新,该对象的resourceVersion值也会改变。所以,resourceVersion相当于一把锁。
当然,Kubernetes在resourceVersion值的生成上,并没有实现自己的一套管理机制,而是直接使用了ETCD的index。
在ETCD中,会维护一个全局的index,每发生一个操作,该index会加1。每个key都会维护一个modified index,表明该节点最近的一次更改index。所以Kubernetes就是借用了modified index。
那和,既然从ETCD的节点中能获取到resourceVersion(即modified index),那就没必要把resourceVersion存储到ETCD中了。所以存储在ETCD中的对象并没有resourceVersion字段,而是在获取时动态添加resourceVersion字段。这也就是本次分析将要介绍的内容,Kubernetes如何动态添加,移除resource version字段的。

List

我们先来看list操作时resourceVersion的添加。来看/pkg/storage/etcd/etcd_helper.go中的List():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
if ctx == nil {
glog.Errorf("Context is nil")
}
trace := util.NewTrace("List " + getTypeName(listObj))
defer trace.LogIfLong(400 * time.Millisecond)
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
key = h.prefixEtcdKey(key)
startTime := time.Now()
trace.Step("About to list etcd node")
//***执行list操作***//
nodes, index, err := h.listEtcdNode(ctx, key)
trace.Step("Etcd node listed")
metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
if err != nil {
return err
}
if err := h.decodeNodeList(nodes, storage.SimpleFilter(pred), listPtr); err != nil {
return err
}
trace.Step("Node list decoded")
if err := h.versioner.UpdateList(listObj, index); err != nil {
return err
}
return nil
}

List()的流程如下:

  1. 获取list的key
  2. 调用listEtcdNode()获取结果;
  3. 调用decodeNodeList()处理list结果;
  4. 更新List对象的resourceVersion。

所以,现在List对象的resourceVersion更新了。那么,List的items中的对象是如何更新resourceVersion的呢?来看decodeNodeList():
主要调用了decodeNodeList():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFunc, slicePtr interface{}) error {
trace := util.NewTrace("decodeNodeList " + getTypeName(slicePtr))
defer trace.LogIfLong(400 * time.Millisecond)
v, err := conversion.EnforcePtr(slicePtr)
if err != nil || v.Kind() != reflect.Slice {
// This should not happen at runtime.
panic("need ptr to slice")
}
for _, node := range nodes {
if node.Dir {
trace.Step("Decoding dir " + node.Key + " START")
if err := h.decodeNodeList(node.Nodes, filter, slicePtr); err != nil {
return err
}
trace.Step("Decoding dir " + node.Key + " END")
continue
}
if obj, found := h.getFromCache(node.ModifiedIndex, filter); found {
// obj != nil iff it matches the filter function.
if obj != nil {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
} else {
obj, _, err := h.codec.Decode([]byte(node.Value), nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
if err != nil {
return err
}
// being unable to set the version does not prevent the object from being extracted
//***更新versionResource***//
_ = h.versioner.UpdateObject(obj, node.ModifiedIndex)
if filter(obj) {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
if node.ModifiedIndex != 0 {
h.addToCache(node.ModifiedIndex, obj)
}
}
}
trace.Step(fmt.Sprintf("Decoded %v nodes", len(nodes)))
return nil
}

decodeNodeList()会更新每个item的resourceVersion。
所以在list操作中,会更新List对象的resourceVersion,也会更新items中每个item的resourceVersion。

Watch

再来看watch操作对resourcceVersion的更新。来看/pkg/storage/etcd/etcd_watcher.go的decodeObject():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
if obj, found := w.cache.getFromCache(node.ModifiedIndex, storage.SimpleFilter(storage.Everything)); found {
return obj, nil
}
obj, err := runtime.Decode(w.encoding, []byte(node.Value))
if err != nil {
return nil, err
}
// ensure resource version is set on the object we load from etcd
if err := w.versioner.UpdateObject(obj, node.ModifiedIndex); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err))
}
// perform any necessary transformation
if w.transform != nil {
obj, err = w.transform(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failure to transform api object %#v: %v", obj, err))
return nil, err
}
}
if node.ModifiedIndex != 0 {
w.cache.addToCache(node.ModifiedIndex, obj)
}
return obj, nil
}

可以看到,decodeObject()会调用versioner.UpdateObject()更新object的resourceVersion。

Get

再来看get操作。来看/pkg/storage/etcd/etcd_helper.go中的Get():

1
2
3
4
5
6
7
8
9
10
// Implements storage.Interface.
func (h *etcdHelper) Get(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) error {
if ctx == nil {
glog.Errorf("Context is nil")
}
key = h.prefixEtcdKey(key)
//***key: /registry/minions/fankang***//
_, _, _, err := h.bodyAndExtractObj(ctx, key, objPtr, ignoreNotFound)
return err
}

Get()调用了bodyAndExtractObj():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// bodyAndExtractObj performs the normal Get path to etcd, returning the parsed node and response for additional information
// about the response, like the current etcd index and the ttl.
func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, err error) {
if ctx == nil {
glog.Errorf("Context is nil")
}
startTime := time.Now()
opts := &etcd.GetOptions{
Quorum: h.quorum,
}
response, err := h.etcdKeysAPI.Get(ctx, key, opts)
metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
if err != nil && !etcdutil.IsEtcdNotFound(err) {
return "", nil, nil, toStorageErr(err, key, 0)
}
body, node, err = h.extractObj(response, err, objPtr, ignoreNotFound, false)
return body, node, response, toStorageErr(err, key, 0)
}

bodyAndExtractObj()调用了extractObj():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//***会更新resourceVersioner的信息***//
func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, node *etcd.Node, err error) {
//***可以看出response中有node信息,而node上有ModifiedIndex***//
if response != nil {
if prevNode {
node = response.PrevNode
} else {
node = response.Node
}
}
if inErr != nil || node == nil || len(node.Value) == 0 {
if ignoreNotFound {
v, err := conversion.EnforcePtr(objPtr)
if err != nil {
return "", nil, err
}
v.Set(reflect.Zero(v.Type()))
return "", nil, nil
} else if inErr != nil {
return "", nil, inErr
}
return "", nil, fmt.Errorf("unable to locate a value on the response: %#v", response)
}
body = node.Value
out, gvk, err := h.codec.Decode([]byte(body), nil, objPtr)
if err != nil {
return body, nil, err
}
if out != objPtr {
return body, nil, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr))
}
// being unable to set the version does not prevent the object from being extracted
//***更新resourceversion***//
_ = h.versioner.UpdateObject(objPtr, node.ModifiedIndex)
return body, node, err
}

extractObj()会更新resourceVersion的信息。

APIObjectVersioner

可以看到,resourceVersion是由APIObjectVersioner完成更新的。APIObjectVersioner主要支持UpdateObject()和UpdateList()两种操作,对应更新普通object的resourceVersion和更新List对象的resourceVersion。APIObjectVersioner定义在/pkg/storage/etcd/api_object_versioner.go中。
APIObjectVersioner实现比较简单,不再具体分析。

APIObjectVersioner::UpdateObject()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// UpdateObject implements Versioner
//***设置object的version***//
func (a APIObjectVersioner) UpdateObject(obj runtime.Object, resourceVersion uint64) error {
//***生成object的accessor***//
accessor, err := meta.Accessor(obj)
if err != nil {
return err
}
versionString := ""
if resourceVersion != 0 {
versionString = strconv.FormatUint(resourceVersion, 10)
}
//***设置ResourceVersion***//
accessor.SetResourceVersion(versionString)
return nil
}

APIObjectVersioner::UpdateList()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// UpdateList implements Versioner
//***更新List的resourceVersion***//
func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint64) error {
//***提取list的Meta信息***//
listMeta, err := api.ListMetaFor(obj)
if err != nil || listMeta == nil {
return err
}
versionString := ""
if resourceVersion != 0 {
versionString = strconv.FormatUint(resourceVersion, 10)
}
listMeta.ResourceVersion = versionString
return nil
}

APIObjectVersioner::ObjectResourceVersion()

1
2
3
4
5
6
7
8
9
10
11
12
13
// ObjectResourceVersion implements Versioner
//***从object获取version并返回***//
func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
accessor, err := meta.Accessor(obj)
if err != nil {
return 0, err
}
version := accessor.GetResourceVersion()
if len(version) == 0 {
return 0, nil
}
return strconv.ParseUint(version, 10, 64)
}