什么是ETCD Helper

ETCD Helper是对ETCD Client的封装。目前Kubernetes有ETCD2 Helper和ETCD3 Helper,当然我们只分析ETCD2 Helper。

factory

上次分析中我们提到NewRawStorage()直接调用factory.Create()来生成底层的key-value存储。这里的factory就是ETCD的入口,定义在/pkg/storage/storagebackend/factory/factory.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//***创建etcd sotorage***//
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
switch c.Type {
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD2:
return newETCD2Storage(c)
case storagebackend.StorageTypeETCD3:
// TODO: We have the following features to implement:
// - Support secure connection by using key, cert, and CA files.
// - Honor "https" scheme to support secure connection in gRPC.
// - Support non-quorum read.
return newETCD3Storage(c)
default:
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}

factory的Create()函数会根据config中的配置调用newETCD2Storage()或newETCD3Storage(c)生成ETCD Helper。来看newETCD3Storage(),定义在/pkg/storage/storagebackend/factory/etcd2.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//***创建etcd2 storage***//
func newETCD2Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
tr, err := newTransportForETCD2(c.CertFile, c.KeyFile, c.CAFile)
if err != nil {
return nil, nil, err
}
//***生成client***//
client, err := newETCD2Client(tr, c.ServerList)
if err != nil {
return nil, nil, err
}
//***会把c.Prefix传递下去***//
//***创建etcd2 storage***//
s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize)
return s, tr.CloseIdleConnections, nil
}
//***创建etcd2 client***//
func newETCD2Client(tr *http.Transport, serverList []string) (etcd2client.Client, error) {
cli, err := etcd2client.New(etcd2client.Config{
Endpoints: serverList,
Transport: tr,
})
if err != nil {
return nil, err
}
return cli, nil
}

newETCD2Storage()会调用newETCD2Client()生成ETCD Client,然后通过etcd.NewEtcdStorage()把ETCD Client封装成ETCD Helper,即EtcdStoage。

ETCD Helper

ETCD Helper定义在/pkg/storage/etcd/etcd_helper.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// etcdHelper is the reference implementation of storage.Interface.
//***和ETCD打交道***//
type etcdHelper struct {
//***etcd member相关操作***//
etcdMembersAPI etcd.MembersAPI
//***etcd key-value相关操作***//
etcdKeysAPI etcd.KeysAPI
codec runtime.Codec
copier runtime.ObjectCopier
// Note that versioner is required for etcdHelper to work correctly.
// The public constructors (NewStorage & NewEtcdStorage) are setting it
// correctly, so be careful when manipulating with it manually.
// optional, has to be set to perform any atomic operations
//***管理resourceVersion***//
versioner storage.Versioner
// prefix for all etcd keys
pathPrefix string
// if true, perform quorum read
quorum bool
// We cache objects stored in etcd. For keys we use Node.ModifiedIndex which is equivalent
// to resourceVersion.
// This depends on etcd's indexes being globally unique across all objects/types. This will
// have to revisited if we decide to do things like multiple etcd clusters, or etcd will
// support multi-object transaction that will result in many objects with the same index.
// Number of entries stored in the cache is controlled by maxEtcdCacheEntries constant.
// TODO: Measure how much this cache helps after the conversion code is optimized.
cache utilcache.Cache
}

其中:

  • etcdMembersAPI表示ETCD member相关操作的client;
  • etcdKeysAPI表示ETCD key相关操作的client;
  • codec表示编码/解码器;
  • copier表示复制器;
  • versioner表示resourceVersion管理器,将在下次分析时介绍;
  • pathPrefix表示存储路径;
  • quorum表示ETCD读数据的方式,如果为true,则使用raft;否则从本地数据读。默认为false。

NewEtcdStorage()

从NewEtcdStorage()可以看出,etcdStorage本质上就是ETCD Helper。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Creates a new storage interface from the client
// TODO: deprecate in favor of storage.Config abstraction over time
func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int) storage.Interface {
//***quorum默认为false***//
return &etcdHelper{
etcdMembersAPI: etcd.NewMembersAPI(client),
etcdKeysAPI: etcd.NewKeysAPI(client),
codec: codec,
versioner: APIObjectVersioner{},
//***path.Jion()会确保prefix只有一个'/'***//
pathPrefix: path.Join("/", prefix),
quorum: quorum,
cache: utilcache.NewCache(cacheSize),
}
}

其他方法

ETCD Helper还实现了Create(), Delete(), Get(), GetToList(), GuaranteedUpdate(), List(), Watch(), WatchList()等方法。
其中:

  • Create()会检查object的resourceVersion,如果不为空,则返回错误;然后调用etcdKeysAPI.set()在ETCD中创建key-value;
  • Delete()允许传入preconditions参数,preconditions中包含一个UID,如果UID不为空,则该UID和object中的UID相同时才允许删除;调用etcdKeysAPI.Delete()删除ETCD中的key。
  • Get()可以获取ETCD中key下的值;
  • GetToList()可以把获取到的值以List的形式返回;
  • GuaranteedUpdate()会确保参数tryUpdate函数运行成功。一个典型的tryUpdate定义在/pgk/registry/generic/registry/store.go的Update()中,只有resourceVersion一样时,才更新;
  • List()对ETCD的目录进行操作以获取目录下的所有value。List()的核心原理调用etcdKeysAPI.Get()时把Recursive参数设置为true;
  • Watch()生成一个etcdWatcher,并对key进行watch;
  • WatchList()同样生成一个etcdWatcher,并对key下的所有子key进行watch。