什么是etcdWatcher
etcdWatcher中封装了etcd的watch操作。etcdWatcher定义在/pkg/storage/etcd/etcd_watcher.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| type etcdWatcher struct { incomingHWM storage.HighWaterMark outgoingHWM storage.HighWaterMark encoding runtime.Codec versioner storage.Versioner transform TransformFunc list bool quorum bool include includeFunc filter storage.FilterFunc etcdIncoming chan *etcd.Response etcdError chan error ctx context.Context cancel context.CancelFunc etcdCallEnded chan struct{} outgoing chan watch.Event userStop chan struct{} stopped bool stopLock sync.Mutex wg sync.WaitGroup emit func(watch.Event) cache etcdCache }
|
etcdWatcher中主要包含:
- encoding: json转换成具体类型时用;
- versioner:管理结构体值中的resourceVersion字段用;
- list:标明是否需要递归操作,如果watch的是一个目录,则list为true;
- quorum:标明是否是raft读,如果quorum为false,则允许本地读;
- etcdIncoming:etcdIncoming是etcdWatcher的入口;
- outgoing:outgoing是etcdWatcher的出口;
- emit:Event的处理函数。
通常watcher都会把incoming中的数据经过处理后发送到outgoing中。etcdWatcher也是如此。
newEtcdWatcher()
newEtcdWatcher()可以生成一个新的etcdWatcher,其中emit函数的作用是把Event放入到outgoing中。newEtcdWatcher()会启动translate() loop。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| func newEtcdWatcher( list bool, quorum bool, include includeFunc, filter storage.FilterFunc, encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc, cache etcdCache) *etcdWatcher { w := &etcdWatcher{ encoding: encoding, versioner: versioner, transform: transform, list: list, quorum: quorum, include: include, filter: filter, etcdIncoming: make(chan *etcd.Response, 100), etcdError: make(chan error, 1), outgoing: make(chan watch.Event, 100), userStop: make(chan struct{}), stopped: false, wg: sync.WaitGroup{}, cache: cache, ctx: nil, cancel: nil, } w.emit = func(e watch.Event) { if curLen := int64(len(w.outgoing)); w.outgoingHWM.Update(curLen) { glog.V(1).Infof("watch (%v): %v objects queued in outgoing channel.", reflect.TypeOf(e.Object).String(), curLen) } select { case w.outgoing <- e: case <-w.userStop: } } w.wg.Add(1) go w.translate() return w }
|
etcdWatch()
etcdWatch()的作用是把etcd中获取到内容放入到etcdWatcher的etcdIncoming中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| / etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called // as a goroutine. //***处理Watch***// func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, key string, resourceVersion uint64) { defer utilruntime.HandleCrash() defer close(w.etcdError) defer close(w.etcdIncoming) // All calls to etcd are coming from this function - once it is finished // no other call to etcd should be generated by this watcher. done := func() {} // We need to be prepared, that Stop() can be called at any time. // It can potentially also be called, even before this function is called. // If that is the case, we simply skip all the code here. // See #18928 for more details. var watcher etcd.Watcher returned := func() bool { w.stopLock.Lock() defer w.stopLock.Unlock() if w.stopped { // Watcher has already been stopped - don't event initiate it here. return true } w.wg.Add(1) done = w.wg.Done if resourceVersion == 0 { latest, err := etcdGetInitialWatchState(ctx, client, key, w.list, w.quorum, w.etcdIncoming) if err != nil { w.etcdError <- err return true } resourceVersion = latest } opts := etcd.WatcherOptions{ Recursive: w.list, AfterIndex: resourceVersion, } watcher = client.Watcher(key, &opts) w.ctx, w.cancel = context.WithCancel(ctx) return false }() defer done() if returned { return } for { resp, err := watcher.Next(w.ctx) if err != nil { w.etcdError <- err return } w.etcdIncoming <- resp } }
|
translate()
translate()方法在newEtcdWatcher()中自动拉起,主要作用消费etcdIncoming中的内容,并调用sendResult()进行处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| func (w *etcdWatcher) translate() { defer w.wg.Done() defer close(w.outgoing) defer utilruntime.HandleCrash() for { select { case err := <-w.etcdError: if err != nil { var status *unversioned.Status switch { case etcdutil.IsEtcdWatchExpired(err): status = &unversioned.Status{ Status: unversioned.StatusFailure, Message: err.Error(), Code: http.StatusGone, Reason: unversioned.StatusReasonExpired, } default: status = &unversioned.Status{ Status: unversioned.StatusFailure, Message: err.Error(), Code: http.StatusInternalServerError, Reason: unversioned.StatusReasonInternalError, } } w.emit(watch.Event{ Type: watch.Error, Object: status, }) } return case <-w.userStop: return case res, ok := <-w.etcdIncoming: if ok { if curLen := int64(len(w.etcdIncoming)); w.incomingHWM.Update(curLen) { glog.V(1).Infof("watch: %v objects queued in incoming channel.", curLen) } w.sendResult(res) } } } }
|
sendResult()
sendResult()根据etcdEvent的动作类型,分别调用sendAdd(), sendModify(), sendDelete()进行处理。
1 2 3 4 5 6 7 8 9 10 11 12
| func (w *etcdWatcher) sendResult(res *etcd.Response) { switch res.Action { case EtcdCreate, EtcdGet: w.sendAdd(res) case EtcdSet, EtcdCAS: w.sendModify(res) case EtcdDelete, EtcdExpire, EtcdCAD: w.sendDelete(res) default: utilruntime.HandleError(fmt.Errorf("unknown action: %v", res.Action)) } }
|
sendAdd(), sendModify(), sendDelete()
sendAdd(), sendModify(), sendDelete()把etcdEvent转换成kubernetes中流通的Event。然后调用emit()把Event放入到outgoing中。
ResultChan()
ResultChan()返回etcdWatcher中的outgoing channel。
1 2 3 4
| func (w *etcdWatcher) ResultChan() <-chan watch.Event { return w.outgoing }
|
总结
etcdWatcher就是把etcdEvent放入到etcdIncoming中,这个过程由etcdWatch()方法完成;然后把etcdIncoming中的内容结过处理放入到outgoing中,这个过程由translate()完成。
etcdWatch()的使用也很简单,可以参照/pkg/storage/etcd/etcd_helper.go中的内容:
1 2
| w := newEtcdWatcher(false, h.quorum, nil, storage.SimpleFilter(pred), h.codec, h.versioner, nil, h) go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
|