引子
之前,我们分析过Kubernetes的reflect机制。reflector可以消费watch channel中的内容,并存储到store中。本次分析就将介绍Kubernetes的listwatch机制的主要调用流程,将按下面两部分进行分析:
- 组件向apiserver请求;
- apiserver向ETCD请求。
首先,介绍组件向apiserver人listwatch请求。让我们来看个例子,在/pkg/kubelet/config/apiserver.go中:
1 2 3 4 5 6
| func NewSourceApiserver(c *clientset.Clientset, nodeName types.NodeName, updates chan<- interface{}) { lw := cache.NewListWatchFromClient(c.Core().RESTClient(), "pods", api.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName))) newSourceApiserverFromLW(lw, updates) }
|
可以很明显看出,listwatch可以使用lw := cache.NewListWatchFromClient(c.Core().RESTClient(), "pods", api.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
生成。
所以,先来看ListWatch的定义。
ListWatch
ListWatch定义在/pkg/client/cache/listwatch.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| type ListerWatcher interface { List(options api.ListOptions) (runtime.Object, error) Watch(options api.ListOptions) (watch.Interface, error) } type ListFunc func(options api.ListOptions) (runtime.Object, error) // WatchFunc knows how to watch resources type WatchFunc func(options api.ListOptions) (watch.Interface, error) // ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface. // It is a convenience function for users of NewReflector, etc. // ListFunc and WatchFunc must not be nil type ListWatch struct { ListFunc ListFunc WatchFunc WatchFunc }
|
所以,只要实现了List()和Watch()的结构体都可以称为ListerWatcher。而ListWatch结构体内刚好有ListFunc和WatchFunc成员,也实现了List()和Watch():
1 2 3 4 5 6 7 8 9 10 11
| func (lw *ListWatch) List(options api.ListOptions) (runtime.Object, error) { return lw.ListFunc(options) } func (lw *ListWatch) Watch(options api.ListOptions) (watch.Interface, error) { return lw.WatchFunc(options) }
|
List()和Watch()分别调用了ListFunc()和WatchFunc()。再来看ListWatch的生成方法,即上面提到的NewListWatchFromClient():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch { listFunc := func(options api.ListOptions) (runtime.Object, error) { return c.Get(). Namespace(namespace). Resource(resource). VersionedParams(&options, api.ParameterCodec). FieldsSelectorParam(fieldSelector). Do(). Get() } watchFunc := func(options api.ListOptions) (watch.Interface, error) { return c.Get(). Prefix("watch"). Namespace(namespace). Resource(resource). VersionedParams(&options, api.ParameterCodec). FieldsSelectorParam(fieldSelector). Watch() } return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc} }
|
可以看出,NewListWatchFromClient()会依据参数来构造listFunc和watchFunc,其中参数c可以为c.Core().RESTClient()。
所以,接下来来看RESTClient。
RESTClient
RESTClient在”kubernetes-client分析(二)-restclient-v1.5.2”进行了分析,但未对watch进行详细介绍。所以,在该小节中会具体介绍watch()方法,定义在/pkg/client/restclient/request.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| func (r *Request) Watch() (watch.Interface, error) { if r.err != nil { return nil, r.err } if r.serializers.Framer == nil { return nil, fmt.Errorf("watching resources is not possible with this client (content-type: %s)", r.content.ContentType) } url := r.URL().String() req, err := http.NewRequest(r.verb, url, r.body) if err != nil { return nil, err } req.Header = r.headers client := r.client if client == nil { client = http.DefaultClient } r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL())) resp, err := client.Do(req) updateURLMetrics(r, resp, err) if r.baseURL != nil { if err != nil { r.backoffMgr.UpdateBackoff(r.baseURL, err, 0) } else { r.backoffMgr.UpdateBackoff(r.baseURL, err, resp.StatusCode) } } if err != nil { if net.IsProbableEOF(err) { return watch.NewEmptyWatch(), nil } return nil, err } if resp.StatusCode != http.StatusOK { defer resp.Body.Close() if result := r.transformResponse(resp, req); result.err != nil { return nil, result.err } return nil, fmt.Errorf("for request '%+v', got status: %v", url, resp.StatusCode) } framer := r.serializers.Framer.NewFrameReader(resp.Body) decoder := streaming.NewDecoder(framer, r.serializers.StreamingSerializer) return watch.NewStreamWatcher(versioned.NewDecoder(decoder, r.serializers.Decoder)), nil }
|
可以看到,代码先执行请求,然后把请求体的response封装成decoder,然后调用watch.NewStreamWatcher()来生成一个stream watcher。
StreamWatcher()
StreamWatcher定义在/pkg/watch/streamwatcher.go中:
1 2 3 4 5 6 7 8
| type StreamWatcher struct { sync.Mutex source Decoder result chan Event stopped bool }
|
StreamWatcher可以从source中获取event,并存储到result chan中。来看StreamWatch的生成函数:
1 2 3 4 5 6 7 8 9 10 11 12
| func NewStreamWatcher(d Decoder) *StreamWatcher { sw := &StreamWatcher{ source: d, result: make(chan Event), } go sw.receive() return sw }
|
NewStreamWatcher()在生成StreamWAtcher后,会启动receive()。receive()定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| func (sw *StreamWatcher) receive() { defer close(sw.result) defer sw.Stop() defer utilruntime.HandleCrash() for { action, obj, err := sw.source.Decode() if err != nil { if sw.stopping() { return } switch err { case io.EOF: case io.ErrUnexpectedEOF: glog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err) default: msg := "Unable to decode an event from the watch stream: %v" if net.IsProbableEOF(err) { glog.V(5).Infof(msg, err) } else { glog.Errorf(msg, err) } } return } sw.result <- Event{ Type: action, Object: obj, } } }
|
在receive()中,先从source中解码出obj,然后把obj封装成Event,并放入到result channel中。
现在result channel中已经有来自apiserver的Event,那么还得定义一个方法返回result channel供调用方消费:
1 2 3 4
| func (sw *StreamWatcher) ResultChan() <-chan Event { return sw.result }
|
ResultChan()就是用来返回StreamWatcher的result channel。每个watcher结构体都会定义ResultChan()用来返回result channel。
关于如何从stream中获取一个obj,以后分析,其最后也是调用了json包。json包解析stream中的数据详见”kubernetes-watcher-demo”。
现在组件已经各apiserver建立了联接,并对联接中数据流中的数据进行解析。所以接下来看apiserver是如何进行watch响应的。
apiserver
apiserver的listwatch由ListResource() resthandler承担,定义在/pkg/apiserver/resthandler.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
| func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch bool, minRequestTimeout time.Duration) restful.RouteFunction { return func(req *restful.Request, res *restful.Response) { trace := util.NewTrace("List " + req.Request.URL.Path) w := res.ResponseWriter namespace, err := scope.Namer.Namespace(req) if err != nil { scope.err(err, res.ResponseWriter, req.Request) return } hasName := true _, name, err := scope.Namer.Name(req) if err != nil { hasName = false } ctx := scope.ContextFunc(req) ctx = api.WithNamespace(ctx, namespace) opts := api.ListOptions{} if err := scope.ParameterCodec.DecodeParameters(req.Request.URL.Query(), scope.Kind.GroupVersion(), &opts); err != nil { scope.err(err, res.ResponseWriter, req.Request) return } if opts.FieldSelector != nil { fn := func(label, value string) (newLabel, newValue string, err error) { return scope.Convertor.ConvertFieldLabel(scope.Kind.GroupVersion().String(), scope.Kind.Kind, label, value) } if opts.FieldSelector, err = opts.FieldSelector.Transform(fn); err != nil { err = errors.NewBadRequest(err.Error()) scope.err(err, res.ResponseWriter, req.Request) return } } if hasName { nameSelector := fields.OneTermEqualSelector("metadata.name", name) if opts.FieldSelector != nil && !opts.FieldSelector.Empty() { scope.err(errors.NewBadRequest("both a name and a field selector provided; please provide one or the other."), res.ResponseWriter, req.Request) return } opts.FieldSelector = nameSelector } if (opts.Watch || forceWatch) && rw != nil { watcher, err := rw.Watch(ctx, &opts) if err != nil { scope.err(err, res.ResponseWriter, req.Request) return } timeout := time.Duration(0) if opts.TimeoutSeconds != nil { timeout = time.Duration(*opts.TimeoutSeconds) * time.Second } if timeout == 0 && minRequestTimeout > 0 { timeout = time.Duration(float64(minRequestTimeout) * (rand.Float64() + 1.0)) } serveWatch(watcher, scope, req, res, timeout) return } defer trace.LogIfLong(500 * time.Millisecond) trace.Step("About to List from storage") result, err := r.List(ctx, &opts) if err != nil { scope.err(err, res.ResponseWriter, req.Request) return } trace.Step("Listing from storage done") numberOfItems, err := setListSelfLink(result, req, scope.Namer) if err != nil { scope.err(err, res.ResponseWriter, req.Request) return } trace.Step("Self-linking done") write(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request) trace.Step(fmt.Sprintf("Writing http response done (%d items)", numberOfItems)) } }
|
ListResource()中包含了list操作和watch操作。list操作比较简单,直接调用storage的List()。watch操作主要调用了serveWatch(),还定义了超时时间,定义在/pkg/apiserver/watch.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| func serveWatch(watcher watch.Interface, scope RequestScope, req *restful.Request, res *restful.Response, timeout time.Duration) { serializer, err := negotiateOutputStreamSerializer(req.Request, scope.Serializer) if err != nil { scope.err(err, res.ResponseWriter, req.Request) return } framer := serializer.StreamSerializer.Framer streamSerializer := serializer.StreamSerializer.Serializer embedded := serializer.Serializer if framer == nil { scope.err(fmt.Errorf("no framer defined for %q available for embedded encoding", serializer.MediaType), res.ResponseWriter, req.Request) return } encoder := scope.Serializer.EncoderForVersion(streamSerializer, scope.Kind.GroupVersion()) useTextFraming := serializer.EncodesAsText embeddedEncoder := scope.Serializer.EncoderForVersion(embedded, scope.Kind.GroupVersion()) mediaType := serializer.MediaType if mediaType != runtime.ContentTypeJSON { mediaType += ";stream=watch" } server := &WatchServer{ watching: watcher, scope: scope, useTextFraming: useTextFraming, mediaType: mediaType, framer: framer, encoder: encoder, embeddedEncoder: embeddedEncoder, fixup: func(obj runtime.Object) { if err := setSelfLink(obj, req, scope.Namer); err != nil { utilruntime.HandleError(fmt.Errorf("failed to set link for object %v: %v", reflect.TypeOf(obj), err)) } }, t: &realTimeoutFactory{timeout}, } server.ServeHTTP(res.ResponseWriter, req.Request) }
|
serveWatch()主要调用了ServeHTTP():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
| func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { w = httplog.Unlogged(w) if wsstream.IsWebSocketRequest(req) { w.Header().Set("Content-Type", s.mediaType) websocket.Handler(s.HandleWS).ServeHTTP(w, req) return } cn, ok := w.(http.CloseNotifier) if !ok { err := fmt.Errorf("unable to start watch - can't get http.CloseNotifier: %#v", w) utilruntime.HandleError(err) s.scope.err(errors.NewInternalError(err), w, req) return } flusher, ok := w.(http.Flusher) if !ok { err := fmt.Errorf("unable to start watch - can't get http.Flusher: %#v", w) utilruntime.HandleError(err) s.scope.err(errors.NewInternalError(err), w, req) return } framer := s.framer.NewFrameWriter(w) if framer == nil { err := fmt.Errorf("no stream framing support is available for media type %q", s.mediaType) utilruntime.HandleError(err) s.scope.err(errors.NewBadRequest(err.Error()), w, req) return } e := streaming.NewEncoder(framer, s.encoder) timeoutCh, cleanup := s.t.TimeoutCh() defer cleanup() defer s.watching.Stop() w.Header().Set("Content-Type", s.mediaType) w.Header().Set("Transfer-Encoding", "chunked") w.WriteHeader(http.StatusOK) flusher.Flush() var unknown runtime.Unknown internalEvent := &versioned.InternalEvent{} buf := &bytes.Buffer{} ch := s.watching.ResultChan() for { select { case <-cn.CloseNotify(): return case <-timeoutCh: return case event, ok := <-ch: if !ok { return } obj := event.Object s.fixup(obj) if err := s.embeddedEncoder.Encode(obj, buf); err != nil { utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err)) return } unknown.Raw = buf.Bytes() event.Object = &unknown *internalEvent = versioned.InternalEvent(event) if err := e.Encode(internalEvent); err != nil { utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v (%#v)", err, e)) return } if len(ch) == 0 { flusher.Flush() } buf.Reset() } } }
|
ServeHTTP()就是把从storeage的watch result channel获取数据,然后使用streamEncoder把数据发送到连接的数据流中。
genericStore
接着来看genericStore的List()和Watch()的实现,实现位于/pkg/registry/generic/registry/store.go中,先来看List():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| func (e *Store) List(ctx api.Context, options *api.ListOptions) (runtime.Object, error) { label := labels.Everything() if options != nil && options.LabelSelector != nil { label = options.LabelSelector } field := fields.Everything() if options != nil && options.FieldSelector != nil { field = options.FieldSelector } out, err := e.ListPredicate(ctx, e.PredicateFunc(label, field), options) if err != nil { return nil, err } if e.Decorator != nil { if err := e.Decorator(out); err != nil { return nil, err } } return out, nil }
|
可以看出,List()调用了ListPredicate():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| func (e *Store) ListPredicate(ctx api.Context, p storage.SelectionPredicate, options *api.ListOptions) (runtime.Object, error) { if options == nil { options = &api.ListOptions{ResourceVersion: ""} } list := e.NewListFunc() if name, ok := p.MatchesSingle(); ok { if key, err := e.KeyFunc(ctx, name); err == nil { err := e.Storage.GetToList(ctx, key, options.ResourceVersion, p, list) return list, storeerr.InterpretListError(err, e.QualifiedResource) } } err := e.Storage.List(ctx, e.KeyRootFunc(ctx), options.ResourceVersion, p, list) return list, storeerr.InterpretListError(err, e.QualifiedResource) }
|
ListPredicate()调用了底层storage的List(),其中p为SelectionPredicate可以匹配selector和label。
再来看Watch():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| func (e *Store) Watch(ctx api.Context, options *api.ListOptions) (watch.Interface, error) { label := labels.Everything() if options != nil && options.LabelSelector != nil { label = options.LabelSelector } field := fields.Everything() if options != nil && options.FieldSelector != nil { field = options.FieldSelector } resourceVersion := "" if options != nil { resourceVersion = options.ResourceVersion } return e.WatchPredicate(ctx, e.PredicateFunc(label, field), resourceVersion) }
|
Watch()主要调用了WatchPredicate():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| func (e *Store) WatchPredicate(ctx api.Context, p storage.SelectionPredicate, resourceVersion string) (watch.Interface, error) { if name, ok := p.MatchesSingle(); ok { if key, err := e.KeyFunc(ctx, name); err == nil { if err != nil { return nil, err } w, err := e.Storage.Watch(ctx, key, resourceVersion, p) if err != nil { return nil, err } if e.Decorator != nil { return newDecoratedWatcher(w, e.Decorator), nil } return w, nil } } w, err := e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, p) if err != nil { return nil, err } if e.Decorator != nil { return newDecoratedWatcher(w, e.Decorator), nil } return w, nil }
|
WatchPredicate()会判断监听的对象,如果是单个元素,那么调用的是底层storage的Watch(),否则调用WatchList()。
storage
现在来看底层storage的List(), Watch()和WatchList(),定义在/pkg/storeage/etcd/etcd_helper.go:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error { if ctx == nil { glog.Errorf("Context is nil") } trace := util.NewTrace("List " + getTypeName(listObj)) defer trace.LogIfLong(400 * time.Millisecond) listPtr, err := meta.GetItemsPtr(listObj) if err != nil { return err } key = h.prefixEtcdKey(key) startTime := time.Now() trace.Step("About to list etcd node") nodes, index, err := h.listEtcdNode(ctx, key) trace.Step("Etcd node listed") metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime) if err != nil { return err } if err := h.decodeNodeList(nodes, storage.SimpleFilter(pred), listPtr); err != nil { return err } trace.Step("Node list decoded") if err := h.versioner.UpdateList(listObj, index); err != nil { return err } return nil }
|
Watch()定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13
| func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) { if ctx == nil { glog.Errorf("Context is nil") } watchRV, err := storage.ParseWatchResourceVersion(resourceVersion) if err != nil { return nil, err } key = h.prefixEtcdKey(key) w := newEtcdWatcher(false, h.quorum, nil, storage.SimpleFilter(pred), h.codec, h.versioner, nil, h) go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV) return w, nil }
|
WatchList()定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) { if ctx == nil { glog.Errorf("Context is nil") } watchRV, err := storage.ParseWatchResourceVersion(resourceVersion) if err != nil { return nil, err } key = h.prefixEtcdKey(key) w := newEtcdWatcher(true, h.quorum, exceptKey(key), storage.SimpleFilter(pred), h.codec, h.versioner, nil, h) go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV) return w, nil }
|
关于etcdwatcher,详见”storage解读(五)-etcdWatcher-v1.5.2”。这里需要注意下,ETCD中存储的obj的metadata是没有resourceVersion这个字段的,但通过kubectl get命令取得的就有,这是因为etcd_helper.go中的Get(),List()等函数调用了APIObjectVersioner的UpdateObject()或UpdateList()函数。
总结
这里只是分析了下listwatch的大概流程,很多细节并未展开分析,会在以后其他文章中逐渐补充。