引子

之前,我们分析过Kubernetes的reflect机制。reflector可以消费watch channel中的内容,并存储到store中。本次分析就将介绍Kubernetes的listwatch机制的主要调用流程,将按下面两部分进行分析:

  1. 组件向apiserver请求;
  2. apiserver向ETCD请求。

首先,介绍组件向apiserver人listwatch请求。让我们来看个例子,在/pkg/kubelet/config/apiserver.go中:

1
2
3
4
5
6
// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c *clientset.Clientset, nodeName types.NodeName, updates chan<- interface{}) {
//***创建apiserver的listwatch***//
lw := cache.NewListWatchFromClient(c.Core().RESTClient(), "pods", api.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
newSourceApiserverFromLW(lw, updates)
}

可以很明显看出,listwatch可以使用lw := cache.NewListWatchFromClient(c.Core().RESTClient(), "pods", api.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))生成。
所以,先来看ListWatch的定义。

ListWatch

ListWatch定义在/pkg/client/cache/listwatch.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
//***ListerWatcher接口,只要有List()和Watch()即可***//
type ListerWatcher interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options api.ListOptions) (runtime.Object, error)
// Watch should begin a watch at the specified version.
Watch(options api.ListOptions) (watch.Interface, error)
}
// ListFunc knows how to list resources
type ListFunc func(options api.ListOptions) (runtime.Object, error)
// WatchFunc knows how to watch resources
type WatchFunc func(options api.ListOptions) (watch.Interface, error)
// ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface.
// It is a convenience function for users of NewReflector, etc.
// ListFunc and WatchFunc must not be nil
type ListWatch struct {
ListFunc ListFunc
WatchFunc WatchFunc
}

所以,只要实现了List()和Watch()的结构体都可以称为ListerWatcher。而ListWatch结构体内刚好有ListFunc和WatchFunc成员,也实现了List()和Watch():

1
2
3
4
5
6
7
8
9
10
11
// List a set of apiserver resources
//***执行List操作***//
func (lw *ListWatch) List(options api.ListOptions) (runtime.Object, error) {
return lw.ListFunc(options)
}
// Watch a set of apiserver resources
//***执行Watch操作***//
func (lw *ListWatch) Watch(options api.ListOptions) (watch.Interface, error) {
return lw.WatchFunc(options)
}

List()和Watch()分别调用了ListFunc()和WatchFunc()。再来看ListWatch的生成方法,即上面提到的NewListWatchFromClient():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
//***使用client的方法来组建ListWatch,并返回***//
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
listFunc := func(options api.ListOptions) (runtime.Object, error) {
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, api.ParameterCodec).
FieldsSelectorParam(fieldSelector).
Do().
Get()
}
watchFunc := func(options api.ListOptions) (watch.Interface, error) {
return c.Get().
Prefix("watch").
Namespace(namespace).
Resource(resource).
VersionedParams(&options, api.ParameterCodec).
FieldsSelectorParam(fieldSelector).
Watch()
}
return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}

可以看出,NewListWatchFromClient()会依据参数来构造listFunc和watchFunc,其中参数c可以为c.Core().RESTClient()。
所以,接下来来看RESTClient。

RESTClient

RESTClient在”kubernetes-client分析(二)-restclient-v1.5.2”进行了分析,但未对watch进行详细介绍。所以,在该小节中会具体介绍watch()方法,定义在/pkg/client/restclient/request.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// Watch attempts to begin watching the requested location.
// Returns a watch.Interface, or an error.
//***Watch实现***//
func (r *Request) Watch() (watch.Interface, error) {
// We specifically don't want to rate limit watches, so we
// don't use r.throttle here.
if r.err != nil {
return nil, r.err
}
if r.serializers.Framer == nil {
return nil, fmt.Errorf("watching resources is not possible with this client (content-type: %s)", r.content.ContentType)
}
url := r.URL().String()
req, err := http.NewRequest(r.verb, url, r.body)
if err != nil {
return nil, err
}
req.Header = r.headers
client := r.client
if client == nil {
client = http.DefaultClient
}
r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
resp, err := client.Do(req)
updateURLMetrics(r, resp, err)
if r.baseURL != nil {
if err != nil {
r.backoffMgr.UpdateBackoff(r.baseURL, err, 0)
} else {
r.backoffMgr.UpdateBackoff(r.baseURL, err, resp.StatusCode)
}
}
if err != nil {
// The watch stream mechanism handles many common partial data errors, so closed
// connections can be retried in many cases.
if net.IsProbableEOF(err) {
return watch.NewEmptyWatch(), nil
}
return nil, err
}
if resp.StatusCode != http.StatusOK {
defer resp.Body.Close()
if result := r.transformResponse(resp, req); result.err != nil {
return nil, result.err
}
return nil, fmt.Errorf("for request '%+v', got status: %v", url, resp.StatusCode)
}
framer := r.serializers.Framer.NewFrameReader(resp.Body)
decoder := streaming.NewDecoder(framer, r.serializers.StreamingSerializer)
//***返回的是StreamWatcher***//
return watch.NewStreamWatcher(versioned.NewDecoder(decoder, r.serializers.Decoder)), nil
}

可以看到,代码先执行请求,然后把请求体的response封装成decoder,然后调用watch.NewStreamWatcher()来生成一个stream watcher。

StreamWatcher()

StreamWatcher定义在/pkg/watch/streamwatcher.go中:

1
2
3
4
5
6
7
8
// StreamWatcher turns any stream for which you can write a Decoder interface
// into a watch.Interface.
type StreamWatcher struct {
sync.Mutex
source Decoder
result chan Event
stopped bool
}

StreamWatcher可以从source中获取event,并存储到result chan中。来看StreamWatch的生成函数:

1
2
3
4
5
6
7
8
9
10
11
12
// NewStreamWatcher creates a StreamWatcher from the given decoder.
func NewStreamWatcher(d Decoder) *StreamWatcher {
sw := &StreamWatcher{
source: d,
// It's easy for a consumer to add buffering via an extra
// goroutine/channel, but impossible for them to remove it,
// so nonbuffered is better.
result: make(chan Event),
}
go sw.receive()
return sw
}

NewStreamWatcher()在生成StreamWAtcher后,会启动receive()。receive()定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// receive reads result from the decoder in a loop and sends down the result channel.
func (sw *StreamWatcher) receive() {
//***在defer中关闭result channel***//
defer close(sw.result)
defer sw.Stop()
defer utilruntime.HandleCrash()
for {
action, obj, err := sw.source.Decode()
//***如果发生错误,则直接返回***//
if err != nil {
// Ignore expected error.
if sw.stopping() {
return
}
switch err {
case io.EOF:
// watch closed normally
case io.ErrUnexpectedEOF:
glog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err)
default:
msg := "Unable to decode an event from the watch stream: %v"
if net.IsProbableEOF(err) {
glog.V(5).Infof(msg, err)
} else {
glog.Errorf(msg, err)
}
}
return
}
//***把Event放入result***//
sw.result <- Event{
Type: action,
Object: obj,
}
}
}

在receive()中,先从source中解码出obj,然后把obj封装成Event,并放入到result channel中。
现在result channel中已经有来自apiserver的Event,那么还得定义一个方法返回result channel供调用方消费:

1
2
3
4
// ResultChan implements Interface.
func (sw *StreamWatcher) ResultChan() <-chan Event {
return sw.result
}

ResultChan()就是用来返回StreamWatcher的result channel。每个watcher结构体都会定义ResultChan()用来返回result channel。
关于如何从stream中获取一个obj,以后分析,其最后也是调用了json包。json包解析stream中的数据详见”kubernetes-watcher-demo”。

现在组件已经各apiserver建立了联接,并对联接中数据流中的数据进行解析。所以接下来看apiserver是如何进行watch响应的。

apiserver

apiserver的listwatch由ListResource() resthandler承担,定义在/pkg/apiserver/resthandler.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// ListResource returns a function that handles retrieving a list of resources from a rest.Storage object.
func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch bool, minRequestTimeout time.Duration) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
// For performance tracking purposes.
trace := util.NewTrace("List " + req.Request.URL.Path)
w := res.ResponseWriter
//***获取namespace***//
namespace, err := scope.Namer.Namespace(req)
if err != nil {
scope.err(err, res.ResponseWriter, req.Request)
return
}
// Watches for single objects are routed to this function.
// Treat a /name parameter the same as a field selector entry.
hasName := true
_, name, err := scope.Namer.Name(req)
if err != nil {
hasName = false
}
ctx := scope.ContextFunc(req)
//***设置namespace key***//
ctx = api.WithNamespace(ctx, namespace)
opts := api.ListOptions{}
if err := scope.ParameterCodec.DecodeParameters(req.Request.URL.Query(), scope.Kind.GroupVersion(), &opts); err != nil {
scope.err(err, res.ResponseWriter, req.Request)
return
}
// transform fields
// TODO: DecodeParametersInto should do this.
if opts.FieldSelector != nil {
fn := func(label, value string) (newLabel, newValue string, err error) {
return scope.Convertor.ConvertFieldLabel(scope.Kind.GroupVersion().String(), scope.Kind.Kind, label, value)
}
if opts.FieldSelector, err = opts.FieldSelector.Transform(fn); err != nil {
// TODO: allow bad request to set field causes based on query parameters
err = errors.NewBadRequest(err.Error())
scope.err(err, res.ResponseWriter, req.Request)
return
}
}
if hasName {
// metadata.name is the canonical internal name.
// SelectionPredicate will notice that this is
// a request for a single object and optimize the
// storage query accordingly.
nameSelector := fields.OneTermEqualSelector("metadata.name", name)
if opts.FieldSelector != nil && !opts.FieldSelector.Empty() {
// It doesn't make sense to ask for both a name
// and a field selector, since just the name is
// sufficient to narrow down the request to a
// single object.
scope.err(errors.NewBadRequest("both a name and a field selector provided; please provide one or the other."), res.ResponseWriter, req.Request)
return
}
opts.FieldSelector = nameSelector
}
if (opts.Watch || forceWatch) && rw != nil {
watcher, err := rw.Watch(ctx, &opts)
if err != nil {
scope.err(err, res.ResponseWriter, req.Request)
return
}
// TODO: Currently we explicitly ignore ?timeout= and use only ?timeoutSeconds=.
timeout := time.Duration(0)
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
//***设置timeout***//
if timeout == 0 && minRequestTimeout > 0 {
timeout = time.Duration(float64(minRequestTimeout) * (rand.Float64() + 1.0))
}
serveWatch(watcher, scope, req, res, timeout)
return
}
// Log only long List requests (ignore Watch).
defer trace.LogIfLong(500 * time.Millisecond)
trace.Step("About to List from storage")
//***获取list result***//
result, err := r.List(ctx, &opts)
if err != nil {
scope.err(err, res.ResponseWriter, req.Request)
return
}
trace.Step("Listing from storage done")
numberOfItems, err := setListSelfLink(result, req, scope.Namer)
if err != nil {
scope.err(err, res.ResponseWriter, req.Request)
return
}
trace.Step("Self-linking done")
write(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request)
trace.Step(fmt.Sprintf("Writing http response done (%d items)", numberOfItems))
}
}

ListResource()中包含了list操作和watch操作。list操作比较简单,直接调用storage的List()。watch操作主要调用了serveWatch(),还定义了超时时间,定义在/pkg/apiserver/watch.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// serveWatch handles serving requests to the server
// TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled.
//***Fankang***//
//***Watch()实现函数***//
func serveWatch(watcher watch.Interface, scope RequestScope, req *restful.Request, res *restful.Response, timeout time.Duration) {
// negotiate for the stream serializer
serializer, err := negotiateOutputStreamSerializer(req.Request, scope.Serializer)
if err != nil {
scope.err(err, res.ResponseWriter, req.Request)
return
}
framer := serializer.StreamSerializer.Framer
streamSerializer := serializer.StreamSerializer.Serializer
embedded := serializer.Serializer
if framer == nil {
scope.err(fmt.Errorf("no framer defined for %q available for embedded encoding", serializer.MediaType), res.ResponseWriter, req.Request)
return
}
encoder := scope.Serializer.EncoderForVersion(streamSerializer, scope.Kind.GroupVersion())
useTextFraming := serializer.EncodesAsText
// find the embedded serializer matching the media type
embeddedEncoder := scope.Serializer.EncoderForVersion(embedded, scope.Kind.GroupVersion())
// TODO: next step, get back mediaTypeOptions from negotiate and return the exact value here
mediaType := serializer.MediaType
if mediaType != runtime.ContentTypeJSON {
mediaType += ";stream=watch"
}
server := &WatchServer{
watching: watcher,
scope: scope,
useTextFraming: useTextFraming,
mediaType: mediaType,
framer: framer,
encoder: encoder,
embeddedEncoder: embeddedEncoder,
fixup: func(obj runtime.Object) {
if err := setSelfLink(obj, req, scope.Namer); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to set link for object %v: %v", reflect.TypeOf(obj), err))
}
},
t: &realTimeoutFactory{timeout},
}
server.ServeHTTP(res.ResponseWriter, req.Request)
}

serveWatch()主要调用了ServeHTTP():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// ServeHTTP serves a series of encoded events via HTTP with Transfer-Encoding: chunked
// or over a websocket connection.
func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w = httplog.Unlogged(w)
//***WebSocketRequest请求走此通道***//
if wsstream.IsWebSocketRequest(req) {
w.Header().Set("Content-Type", s.mediaType)
websocket.Handler(s.HandleWS).ServeHTTP(w, req)
return
}
cn, ok := w.(http.CloseNotifier)
if !ok {
err := fmt.Errorf("unable to start watch - can't get http.CloseNotifier: %#v", w)
utilruntime.HandleError(err)
s.scope.err(errors.NewInternalError(err), w, req)
return
}
flusher, ok := w.(http.Flusher)
if !ok {
err := fmt.Errorf("unable to start watch - can't get http.Flusher: %#v", w)
utilruntime.HandleError(err)
s.scope.err(errors.NewInternalError(err), w, req)
return
}
framer := s.framer.NewFrameWriter(w)
if framer == nil {
// programmer error
err := fmt.Errorf("no stream framing support is available for media type %q", s.mediaType)
utilruntime.HandleError(err)
s.scope.err(errors.NewBadRequest(err.Error()), w, req)
return
}
e := streaming.NewEncoder(framer, s.encoder)
// ensure the connection times out
timeoutCh, cleanup := s.t.TimeoutCh()
defer cleanup()
defer s.watching.Stop()
// begin the stream
w.Header().Set("Content-Type", s.mediaType)
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
flusher.Flush()
var unknown runtime.Unknown
internalEvent := &versioned.InternalEvent{}
buf := &bytes.Buffer{}
//***获取watcher的outgoing channel***//
ch := s.watching.ResultChan()
for {
select {
case <-cn.CloseNotify():
return
case <-timeoutCh:
return
case event, ok := <-ch:
//***出错返回***//
if !ok {
// End of results.
return
}
obj := event.Object
s.fixup(obj)
if err := s.embeddedEncoder.Encode(obj, buf); err != nil {
// unexpected error
utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err))
return
}
// ContentType is not required here because we are defaulting to the serializer
// type
unknown.Raw = buf.Bytes()
event.Object = &unknown
// the internal event will be versioned by the encoder
*internalEvent = versioned.InternalEvent(event)
if err := e.Encode(internalEvent); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v (%#v)", err, e))
// client disconnect.
return
}
if len(ch) == 0 {
flusher.Flush()
}
buf.Reset()
}
}
}

ServeHTTP()就是把从storeage的watch result channel获取数据,然后使用streamEncoder把数据发送到连接的数据流中。

genericStore

接着来看genericStore的List()和Watch()的实现,实现位于/pkg/registry/generic/registry/store.go中,先来看List():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// List returns a list of items matching labels and field
//***list操作***//
func (e *Store) List(ctx api.Context, options *api.ListOptions) (runtime.Object, error) {
//***处理label***//
label := labels.Everything()
if options != nil && options.LabelSelector != nil {
label = options.LabelSelector
}
//***处理field***//
field := fields.Everything()
if options != nil && options.FieldSelector != nil {
field = options.FieldSelector
}
//***调用ListPredicate()***//
out, err := e.ListPredicate(ctx, e.PredicateFunc(label, field), options)
if err != nil {
return nil, err
}
if e.Decorator != nil {
if err := e.Decorator(out); err != nil {
return nil, err
}
}
return out, nil
}

可以看出,List()调用了ListPredicate():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// ListPredicate returns a list of all the items matching m.
func (e *Store) ListPredicate(ctx api.Context, p storage.SelectionPredicate, options *api.ListOptions) (runtime.Object, error) {
if options == nil {
// By default we should serve the request from etcd.
options = &api.ListOptions{ResourceVersion: ""}
}
list := e.NewListFunc()
//***调用MatchesSingle()***//
if name, ok := p.MatchesSingle(); ok {
if key, err := e.KeyFunc(ctx, name); err == nil {
err := e.Storage.GetToList(ctx, key, options.ResourceVersion, p, list)
return list, storeerr.InterpretListError(err, e.QualifiedResource)
}
// if we cannot extract a key based on the current context, the optimization is skipped
}
//***List***//
err := e.Storage.List(ctx, e.KeyRootFunc(ctx), options.ResourceVersion, p, list)
return list, storeerr.InterpretListError(err, e.QualifiedResource)
}

ListPredicate()调用了底层storage的List(),其中p为SelectionPredicate可以匹配selector和label。
再来看Watch():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Watch makes a matcher for the given label and field, and calls
// WatchPredicate. If possible, you should customize PredicateFunc to produre a
// matcher that matches by key. SelectionPredicate does this for you
// automatically.
//***Watch()的实现,其中有label的用法,可以学下***//
func (e *Store) Watch(ctx api.Context, options *api.ListOptions) (watch.Interface, error) {
label := labels.Everything()
if options != nil && options.LabelSelector != nil {
label = options.LabelSelector
}
field := fields.Everything()
if options != nil && options.FieldSelector != nil {
field = options.FieldSelector
}
resourceVersion := ""
if options != nil {
resourceVersion = options.ResourceVersion
}
//***调用WatchPredicate()***//
return e.WatchPredicate(ctx, e.PredicateFunc(label, field), resourceVersion)
}

Watch()主要调用了WatchPredicate():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// WatchPredicate starts a watch for the items that m matches.
func (e *Store) WatchPredicate(ctx api.Context, p storage.SelectionPredicate, resourceVersion string) (watch.Interface, error) {
if name, ok := p.MatchesSingle(); ok {
if key, err := e.KeyFunc(ctx, name); err == nil {
if err != nil {
return nil, err
}
//***Watch单个元素***//
w, err := e.Storage.Watch(ctx, key, resourceVersion, p)
if err != nil {
return nil, err
}
if e.Decorator != nil {
return newDecoratedWatcher(w, e.Decorator), nil
}
return w, nil
}
// if we cannot extract a key based on the current context, the optimization is skipped
}
//***watch list***//
w, err := e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, p)
if err != nil {
return nil, err
}
if e.Decorator != nil {
return newDecoratedWatcher(w, e.Decorator), nil
}
return w, nil
}

WatchPredicate()会判断监听的对象,如果是单个元素,那么调用的是底层storage的Watch(),否则调用WatchList()。

storage

现在来看底层storage的List(), Watch()和WatchList(),定义在/pkg/storeage/etcd/etcd_helper.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Implements storage.Interface.
func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
if ctx == nil {
glog.Errorf("Context is nil")
}
trace := util.NewTrace("List " + getTypeName(listObj))
defer trace.LogIfLong(400 * time.Millisecond)
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
key = h.prefixEtcdKey(key)
startTime := time.Now()
trace.Step("About to list etcd node")
//***执行list操作***//
nodes, index, err := h.listEtcdNode(ctx, key)
trace.Step("Etcd node listed")
metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
if err != nil {
return err
}
if err := h.decodeNodeList(nodes, storage.SimpleFilter(pred), listPtr); err != nil {
return err
}
trace.Step("Node list decoded")
if err := h.versioner.UpdateList(listObj, index); err != nil {
return err
}
return nil
}

Watch()定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
if ctx == nil {
glog.Errorf("Context is nil")
}
watchRV, err := storage.ParseWatchResourceVersion(resourceVersion)
if err != nil {
return nil, err
}
key = h.prefixEtcdKey(key)
w := newEtcdWatcher(false, h.quorum, nil, storage.SimpleFilter(pred), h.codec, h.versioner, nil, h)
go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
return w, nil
}

WatchList()定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Implements storage.Interface.
//***处理WatchList***//
func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
if ctx == nil {
glog.Errorf("Context is nil")
}
watchRV, err := storage.ParseWatchResourceVersion(resourceVersion)
if err != nil {
return nil, err
}
key = h.prefixEtcdKey(key)
//***调用etcd_watcher.go中的etcdWatch()***//
w := newEtcdWatcher(true, h.quorum, exceptKey(key), storage.SimpleFilter(pred), h.codec, h.versioner, nil, h)
go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
return w, nil
}

关于etcdwatcher,详见”storage解读(五)-etcdWatcher-v1.5.2”。这里需要注意下,ETCD中存储的obj的metadata是没有resourceVersion这个字段的,但通过kubectl get命令取得的就有,这是因为etcd_helper.go中的Get(),List()等函数调用了APIObjectVersioner的UpdateObject()或UpdateList()函数。

总结

这里只是分析了下listwatch的大概流程,很多细节并未展开分析,会在以后其他文章中逐渐补充。