什么是etcdWatcher

etcdWatcher中封装了etcd的watch操作。etcdWatcher定义在/pkg/storage/etcd/etcd_watcher.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// etcdWatcher converts a native etcd watch to a watch.Interface.
type etcdWatcher struct {
// HighWaterMarks for performance debugging.
// Important: Since HighWaterMark is using sync/atomic, it has to be at the top of the struct due to a bug on 32-bit platforms
// See: https://golang.org/pkg/sync/atomic/ for more information
incomingHWM storage.HighWaterMark
outgoingHWM storage.HighWaterMark
encoding runtime.Codec
// Note that versioner is required for etcdWatcher to work correctly.
// There is no public constructor of it, so be careful when manipulating
// with it manually.
versioner storage.Versioner
transform TransformFunc
list bool // If we're doing a recursive watch, should be true.
quorum bool // If we enable quorum, shoule be true
include includeFunc
filter storage.FilterFunc
etcdIncoming chan *etcd.Response
etcdError chan error
ctx context.Context
cancel context.CancelFunc
etcdCallEnded chan struct{}
outgoing chan watch.Event
userStop chan struct{}
stopped bool
stopLock sync.Mutex
// wg is used to avoid calls to etcd after Stop(), and to make sure
// that the translate goroutine is not leaked.
wg sync.WaitGroup
// Injectable for testing. Send the event down the outgoing channel.
emit func(watch.Event)
cache etcdCache
}

etcdWatcher中主要包含:

  • encoding: json转换成具体类型时用;
  • versioner:管理结构体值中的resourceVersion字段用;
  • list:标明是否需要递归操作,如果watch的是一个目录,则list为true;
  • quorum:标明是否是raft读,如果quorum为false,则允许本地读;
  • etcdIncoming:etcdIncoming是etcdWatcher的入口;
  • outgoing:outgoing是etcdWatcher的出口;
  • emit:Event的处理函数。

通常watcher都会把incoming中的数据经过处理后发送到outgoing中。etcdWatcher也是如此。

newEtcdWatcher()

newEtcdWatcher()可以生成一个新的etcdWatcher,其中emit函数的作用是把Event放入到outgoing中。newEtcdWatcher()会启动translate() loop。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes.
// The versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(
list bool, quorum bool, include includeFunc, filter storage.FilterFunc,
encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc,
cache etcdCache) *etcdWatcher {
w := &etcdWatcher{
encoding: encoding,
versioner: versioner,
transform: transform,
list: list,
quorum: quorum,
include: include,
filter: filter,
// Buffer this channel, so that the etcd client is not forced
// to context switch with every object it gets, and so that a
// long time spent decoding an object won't block the *next*
// object. Basically, we see a lot of "401 window exceeded"
// errors from etcd, and that's due to the client not streaming
// results but rather getting them one at a time. So we really
// want to never block the etcd client, if possible. The 100 is
// mostly arbitrary--we know it goes as high as 50, though.
// There's a V(2) log message that prints the length so we can
// monitor how much of this buffer is actually used.
etcdIncoming: make(chan *etcd.Response, 100),
etcdError: make(chan error, 1),
// Similarly to etcdIncomming, we don't want to force context
// switch on every new incoming object.
outgoing: make(chan watch.Event, 100),
userStop: make(chan struct{}),
stopped: false,
wg: sync.WaitGroup{},
cache: cache,
ctx: nil,
cancel: nil,
}
w.emit = func(e watch.Event) {
if curLen := int64(len(w.outgoing)); w.outgoingHWM.Update(curLen) {
// Monitor if this gets backed up, and how much.
glog.V(1).Infof("watch (%v): %v objects queued in outgoing channel.", reflect.TypeOf(e.Object).String(), curLen)
}
// Give up on user stop, without this we leak a lot of goroutines in tests.
select {
case w.outgoing <- e:
case <-w.userStop:
}
}
// translate will call done. We need to Add() here because otherwise,
// if Stop() gets called before translate gets started, there'd be a
// problem.
w.wg.Add(1)
go w.translate()
return w
}

etcdWatch()

etcdWatch()的作用是把etcd中获取到内容放入到etcdWatcher的etcdIncoming中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/ etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called
// as a goroutine.
//***处理Watch***//
func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, key string, resourceVersion uint64) {
defer utilruntime.HandleCrash()
defer close(w.etcdError)
defer close(w.etcdIncoming)
// All calls to etcd are coming from this function - once it is finished
// no other call to etcd should be generated by this watcher.
done := func() {}
// We need to be prepared, that Stop() can be called at any time.
// It can potentially also be called, even before this function is called.
// If that is the case, we simply skip all the code here.
// See #18928 for more details.
var watcher etcd.Watcher
returned := func() bool {
w.stopLock.Lock()
defer w.stopLock.Unlock()
if w.stopped {
// Watcher has already been stopped - don't event initiate it here.
return true
}
w.wg.Add(1)
done = w.wg.Done
// Perform initialization of watcher under lock - we want to avoid situation when
// Stop() is called in the meantime (which in tests can cause etcd termination and
// strange behavior here).
if resourceVersion == 0 {
latest, err := etcdGetInitialWatchState(ctx, client, key, w.list, w.quorum, w.etcdIncoming)
if err != nil {
w.etcdError <- err
return true
}
resourceVersion = latest
}
opts := etcd.WatcherOptions{
//***如果是对目录进行操作,在k8s角度,是list;在ETCD角度,是recursive为true***//
Recursive: w.list,
AfterIndex: resourceVersion,
}
watcher = client.Watcher(key, &opts)
w.ctx, w.cancel = context.WithCancel(ctx)
return false
}()
defer done()
if returned {
return
}
//***真正实现watch的地方,etcd client会把更新放到w.etcdIncoming***//
for {
resp, err := watcher.Next(w.ctx)
if err != nil {
w.etcdError <- err
return
}
w.etcdIncoming <- resp
}
}

translate()

translate()方法在newEtcdWatcher()中自动拉起,主要作用消费etcdIncoming中的内容,并调用sendResult()进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
//***translate()会处理w.incoming中的内容***//
func (w *etcdWatcher) translate() {
defer w.wg.Done()
defer close(w.outgoing)
defer utilruntime.HandleCrash()
for {
select {
case err := <-w.etcdError:
if err != nil {
var status *unversioned.Status
switch {
case etcdutil.IsEtcdWatchExpired(err):
status = &unversioned.Status{
Status: unversioned.StatusFailure,
Message: err.Error(),
Code: http.StatusGone, // Gone
Reason: unversioned.StatusReasonExpired,
}
// TODO: need to generate errors using api/errors which has a circular dependency on this package
// no other way to inject errors
// case etcdutil.IsEtcdUnreachable(err):
// status = errors.NewServerTimeout(...)
default:
status = &unversioned.Status{
Status: unversioned.StatusFailure,
Message: err.Error(),
Code: http.StatusInternalServerError,
Reason: unversioned.StatusReasonInternalError,
}
}
w.emit(watch.Event{
Type: watch.Error,
Object: status,
})
}
return
case <-w.userStop:
return
case res, ok := <-w.etcdIncoming:
if ok {
if curLen := int64(len(w.etcdIncoming)); w.incomingHWM.Update(curLen) {
// Monitor if this gets backed up, and how much.
glog.V(1).Infof("watch: %v objects queued in incoming channel.", curLen)
}
//***把incoming中的内容发送到outgoing***//
w.sendResult(res)
}
// If !ok, don't return here-- must wait for etcdError channel
// to give an error or be closed.
}
}
}

sendResult()

sendResult()根据etcdEvent的动作类型,分别调用sendAdd(), sendModify(), sendDelete()进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
func (w *etcdWatcher) sendResult(res *etcd.Response) {
switch res.Action {
case EtcdCreate, EtcdGet:
w.sendAdd(res)
case EtcdSet, EtcdCAS:
w.sendModify(res)
case EtcdDelete, EtcdExpire, EtcdCAD:
w.sendDelete(res)
default:
utilruntime.HandleError(fmt.Errorf("unknown action: %v", res.Action))
}
}

sendAdd(), sendModify(), sendDelete()

sendAdd(), sendModify(), sendDelete()把etcdEvent转换成kubernetes中流通的Event。然后调用emit()把Event放入到outgoing中。

ResultChan()

ResultChan()返回etcdWatcher中的outgoing channel。

1
2
3
4
//***返回chan:w.outgoing***//
func (w *etcdWatcher) ResultChan() <-chan watch.Event {
return w.outgoing
}

总结

etcdWatcher就是把etcdEvent放入到etcdIncoming中,这个过程由etcdWatch()方法完成;然后把etcdIncoming中的内容结过处理放入到outgoing中,这个过程由translate()完成。
etcdWatch()的使用也很简单,可以参照/pkg/storage/etcd/etcd_helper.go中的内容:

1
2
w := newEtcdWatcher(false, h.quorum, nil, storage.SimpleFilter(pred), h.codec, h.versioner, nil, h)
go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)