什么是ETCD Helper
ETCD Helper是对ETCD Client的封装。目前Kubernetes有ETCD2 Helper和ETCD3 Helper,当然我们只分析ETCD2 Helper。
factory
上次分析中我们提到NewRawStorage()直接调用factory.Create()来生成底层的key-value存储。这里的factory就是ETCD的入口,定义在/pkg/storage/storagebackend/factory/factory.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) { switch c.Type { case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD2: return newETCD2Storage(c) case storagebackend.StorageTypeETCD3: return newETCD3Storage(c) default: return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type) } }
|
factory的Create()函数会根据config中的配置调用newETCD2Storage()或newETCD3Storage(c)生成ETCD Helper。来看newETCD3Storage(),定义在/pkg/storage/storagebackend/factory/etcd2.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| func newETCD2Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) { tr, err := newTransportForETCD2(c.CertFile, c.KeyFile, c.CAFile) if err != nil { return nil, nil, err } client, err := newETCD2Client(tr, c.ServerList) if err != nil { return nil, nil, err } s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize) return s, tr.CloseIdleConnections, nil } func newETCD2Client(tr *http.Transport, serverList []string) (etcd2client.Client, error) { cli, err := etcd2client.New(etcd2client.Config{ Endpoints: serverList, Transport: tr, }) if err != nil { return nil, err } return cli, nil }
|
newETCD2Storage()会调用newETCD2Client()生成ETCD Client,然后通过etcd.NewEtcdStorage()把ETCD Client封装成ETCD Helper,即EtcdStoage。
ETCD Helper
ETCD Helper定义在/pkg/storage/etcd/etcd_helper.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| type etcdHelper struct { etcdMembersAPI etcd.MembersAPI etcdKeysAPI etcd.KeysAPI codec runtime.Codec copier runtime.ObjectCopier versioner storage.Versioner pathPrefix string quorum bool cache utilcache.Cache }
|
其中:
- etcdMembersAPI表示ETCD member相关操作的client;
- etcdKeysAPI表示ETCD key相关操作的client;
- codec表示编码/解码器;
- copier表示复制器;
- versioner表示resourceVersion管理器,将在下次分析时介绍;
- pathPrefix表示存储路径;
- quorum表示ETCD读数据的方式,如果为true,则使用raft;否则从本地数据读。默认为false。
NewEtcdStorage()
从NewEtcdStorage()可以看出,etcdStorage本质上就是ETCD Helper。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int) storage.Interface { return &etcdHelper{ etcdMembersAPI: etcd.NewMembersAPI(client), etcdKeysAPI: etcd.NewKeysAPI(client), codec: codec, versioner: APIObjectVersioner{}, pathPrefix: path.Join("/", prefix), quorum: quorum, cache: utilcache.NewCache(cacheSize), } }
|
其他方法
ETCD Helper还实现了Create(), Delete(), Get(), GetToList(), GuaranteedUpdate(), List(), Watch(), WatchList()等方法。
其中:
- Create()会检查object的resourceVersion,如果不为空,则返回错误;然后调用etcdKeysAPI.set()在ETCD中创建key-value;
- Delete()允许传入preconditions参数,preconditions中包含一个UID,如果UID不为空,则该UID和object中的UID相同时才允许删除;调用etcdKeysAPI.Delete()删除ETCD中的key。
- Get()可以获取ETCD中key下的值;
- GetToList()可以把获取到的值以List的形式返回;
- GuaranteedUpdate()会确保参数tryUpdate函数运行成功。一个典型的tryUpdate定义在/pgk/registry/generic/registry/store.go的Update()中,只有resourceVersion一样时,才更新;
- List()对ETCD的目录进行操作以获取目录下的所有value。List()的核心原理调用etcdKeysAPI.Get()时把Recursive参数设置为true;
- Watch()生成一个etcdWatcher,并对key进行watch;
- WatchList()同样生成一个etcdWatcher,并对key下的所有子key进行watch。