resourceVersion
我们先来看下Kubernetes的版本策略。通过Apiserver获取到的所有对象中,都有一个”resourceVersion”的字段。如:
1 2 3 4 5 6
| apiVersion: v1 kind: Pod metadata: resourceVersion: "879232" selfLink: /api/v1/namespaces/default/pods/nginx-1zr5x uid: 9910eaf7-f0f3-11e7-a0b3-0800274a4ec3
|
该Pod的resourceVersion为879232,更新该Pod时,Kubernetes会比较该resourceVersion和ETCD中对象的resourceVersion,在一致的情况下都会更新,一旦发生更新,该对象的resourceVersion值也会改变。所以,resourceVersion相当于一把锁。
当然,Kubernetes在resourceVersion值的生成上,并没有实现自己的一套管理机制,而是直接使用了ETCD的index。
在ETCD中,会维护一个全局的index,每发生一个操作,该index会加1。每个key都会维护一个modified index,表明该节点最近的一次更改index。所以Kubernetes就是借用了modified index。
那和,既然从ETCD的节点中能获取到resourceVersion(即modified index),那就没必要把resourceVersion存储到ETCD中了。所以存储在ETCD中的对象并没有resourceVersion字段,而是在获取时动态添加resourceVersion字段。这也就是本次分析将要介绍的内容,Kubernetes如何动态添加,移除resource version字段的。
List
我们先来看list操作时resourceVersion的添加。来看/pkg/storage/etcd/etcd_helper.go中的List():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error { if ctx == nil { glog.Errorf("Context is nil") } trace := util.NewTrace("List " + getTypeName(listObj)) defer trace.LogIfLong(400 * time.Millisecond) listPtr, err := meta.GetItemsPtr(listObj) if err != nil { return err } key = h.prefixEtcdKey(key) startTime := time.Now() trace.Step("About to list etcd node") nodes, index, err := h.listEtcdNode(ctx, key) trace.Step("Etcd node listed") metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime) if err != nil { return err } if err := h.decodeNodeList(nodes, storage.SimpleFilter(pred), listPtr); err != nil { return err } trace.Step("Node list decoded") if err := h.versioner.UpdateList(listObj, index); err != nil { return err } return nil }
|
List()的流程如下:
- 获取list的key
- 调用listEtcdNode()获取结果;
- 调用decodeNodeList()处理list结果;
- 更新List对象的resourceVersion。
所以,现在List对象的resourceVersion更新了。那么,List的items中的对象是如何更新resourceVersion的呢?来看decodeNodeList():
主要调用了decodeNodeList():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFunc, slicePtr interface{}) error { trace := util.NewTrace("decodeNodeList " + getTypeName(slicePtr)) defer trace.LogIfLong(400 * time.Millisecond) v, err := conversion.EnforcePtr(slicePtr) if err != nil || v.Kind() != reflect.Slice { panic("need ptr to slice") } for _, node := range nodes { if node.Dir { trace.Step("Decoding dir " + node.Key + " START") if err := h.decodeNodeList(node.Nodes, filter, slicePtr); err != nil { return err } trace.Step("Decoding dir " + node.Key + " END") continue } if obj, found := h.getFromCache(node.ModifiedIndex, filter); found { if obj != nil { v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) } } else { obj, _, err := h.codec.Decode([]byte(node.Value), nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object)) if err != nil { return err } _ = h.versioner.UpdateObject(obj, node.ModifiedIndex) if filter(obj) { v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) } if node.ModifiedIndex != 0 { h.addToCache(node.ModifiedIndex, obj) } } } trace.Step(fmt.Sprintf("Decoded %v nodes", len(nodes))) return nil }
|
decodeNodeList()会更新每个item的resourceVersion。
所以在list操作中,会更新List对象的resourceVersion,也会更新items中每个item的resourceVersion。
Watch
再来看watch操作对resourcceVersion的更新。来看/pkg/storage/etcd/etcd_watcher.go的decodeObject():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) { if obj, found := w.cache.getFromCache(node.ModifiedIndex, storage.SimpleFilter(storage.Everything)); found { return obj, nil } obj, err := runtime.Decode(w.encoding, []byte(node.Value)) if err != nil { return nil, err } if err := w.versioner.UpdateObject(obj, node.ModifiedIndex); err != nil { utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err)) } if w.transform != nil { obj, err = w.transform(obj) if err != nil { utilruntime.HandleError(fmt.Errorf("failure to transform api object %#v: %v", obj, err)) return nil, err } } if node.ModifiedIndex != 0 { w.cache.addToCache(node.ModifiedIndex, obj) } return obj, nil }
|
可以看到,decodeObject()会调用versioner.UpdateObject()更新object的resourceVersion。
Get
再来看get操作。来看/pkg/storage/etcd/etcd_helper.go中的Get():
1 2 3 4 5 6 7 8 9 10
| func (h *etcdHelper) Get(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) error { if ctx == nil { glog.Errorf("Context is nil") } key = h.prefixEtcdKey(key) _, _, _, err := h.bodyAndExtractObj(ctx, key, objPtr, ignoreNotFound) return err }
|
Get()调用了bodyAndExtractObj():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, err error) { if ctx == nil { glog.Errorf("Context is nil") } startTime := time.Now() opts := &etcd.GetOptions{ Quorum: h.quorum, } response, err := h.etcdKeysAPI.Get(ctx, key, opts) metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime) if err != nil && !etcdutil.IsEtcdNotFound(err) { return "", nil, nil, toStorageErr(err, key, 0) } body, node, err = h.extractObj(response, err, objPtr, ignoreNotFound, false) return body, node, response, toStorageErr(err, key, 0) }
|
bodyAndExtractObj()调用了extractObj():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, node *etcd.Node, err error) { if response != nil { if prevNode { node = response.PrevNode } else { node = response.Node } } if inErr != nil || node == nil || len(node.Value) == 0 { if ignoreNotFound { v, err := conversion.EnforcePtr(objPtr) if err != nil { return "", nil, err } v.Set(reflect.Zero(v.Type())) return "", nil, nil } else if inErr != nil { return "", nil, inErr } return "", nil, fmt.Errorf("unable to locate a value on the response: %#v", response) } body = node.Value out, gvk, err := h.codec.Decode([]byte(body), nil, objPtr) if err != nil { return body, nil, err } if out != objPtr { return body, nil, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr)) } _ = h.versioner.UpdateObject(objPtr, node.ModifiedIndex) return body, node, err }
|
extractObj()会更新resourceVersion的信息。
APIObjectVersioner
可以看到,resourceVersion是由APIObjectVersioner完成更新的。APIObjectVersioner主要支持UpdateObject()和UpdateList()两种操作,对应更新普通object的resourceVersion和更新List对象的resourceVersion。APIObjectVersioner定义在/pkg/storage/etcd/api_object_versioner.go中。
APIObjectVersioner实现比较简单,不再具体分析。
APIObjectVersioner::UpdateObject()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func (a APIObjectVersioner) UpdateObject(obj runtime.Object, resourceVersion uint64) error { accessor, err := meta.Accessor(obj) if err != nil { return err } versionString := "" if resourceVersion != 0 { versionString = strconv.FormatUint(resourceVersion, 10) } accessor.SetResourceVersion(versionString) return nil }
|
APIObjectVersioner::UpdateList()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint64) error { listMeta, err := api.ListMetaFor(obj) if err != nil || listMeta == nil { return err } versionString := "" if resourceVersion != 0 { versionString = strconv.FormatUint(resourceVersion, 10) } listMeta.ResourceVersion = versionString return nil }
|
APIObjectVersioner::ObjectResourceVersion()
1 2 3 4 5 6 7 8 9 10 11 12 13
| func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) { accessor, err := meta.Accessor(obj) if err != nil { return 0, err } version := accessor.GetResourceVersion() if len(version) == 0 { return 0, nil } return strconv.ParseUint(version, 10, 64) }
|