Store和Indexer
Store和Indexer 是Kubernetes用来存储obj的结构体,与reflector机制配合使用,可以把ETCD中的变化同步到Store和Indexer中,在代码中却可以直接从Store和Indexer获取相应的对象。所以说,Store和Indexer是ETCD中内容在内存中的一个缓存。
先来看下什么是Store。只要定义了Add(), Delete(), Update(), Get()等方法的结构体都可以称为Store。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| type Store interface { Add(obj interface{}) error Update(obj interface{}) error Delete(obj interface{}) error List() []interface{} ListKeys() []string Get(obj interface{}) (item interface{}, exists bool, err error) GetByKey(key string) (item interface{}, exists bool, err error) Replace([]interface{}, string) error Resync() error }
|
Indexer是在Store的基础上增加了index func管理的功能。具体稍后分析。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| type Indexer interface { Store Index(indexName string, obj interface{}) ([]interface{}, error) ListIndexFuncValues(indexName string) []string ByIndex(indexName, indexKey string) ([]interface{}, error) GetIndexers() Indexers AddIndexers(newIndexers Indexers) error }
|
Store和Indexer的生成函数定义在/pkg/client/cache/store.go中。可以看出,Store和Indexer都是一个cache,其本质都是一个threadSafeStore。不同的是Store的Indexers参数为空,而Indexer的Indexers参数有值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| func NewStore(keyFunc KeyFunc) Store { return &cache{ cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}), keyFunc: keyFunc, } } func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer { return &cache{ cacheStorage: NewThreadSafeStore(indexers, Indices{}), keyFunc: keyFunc, } }
|
Index, Indexers和Indices
在分析threadSafeStore之前,需要介绍Index, Indexers和Indices三个重量级的概念。
- Index: 为key到obj set的map。可以通过key查找到对应的obj。Key使用keyFunc对obj计算得来。
- Indexers: 为name到keyFunc的map。可以通过某名字,找到keyFunc。
- Indices:为name到Index的map。可以通过某名字,找到Index。
1 2 3 4 5 6 7 8 9 10 11
| type Index map[string]sets.String type Indexers map[string]IndexFunc type Indices map[string]Index
|
所以,可以通过一个比较人性化的名字,找到一个keyFunc及一个Index,通过keyFunc计算出obj的key,然后在Index中通过key找到obj。当然,不同的obj通过keyFunc可能计算出同一个key,这时候,这些obj都放在Index的同一key下。
现在有些明了这三个结构体的作用了,用keyFunc对很多obj进行分类存储,以方便存取。如keyFunc返回的是obj的namespace,那么我们就可以获取某namespace下的资源了。 所以keyFunc在系统中也称IndexFunc。
以上的很多命名是为了方便理解,现在为了和cache中的keyFunc进行区分,以后我们统一约定称为IndexFunc,其计算出来的值为indexValue。
ThreadSafeStore
ThreadSafeStore是一个interface, 定义在/pkg/client/cache/thread_safe_store.go中,是底层具体负责对象存储的结构体,其具体由threadSafeMap实现。先看来threadSafeMap的定义,如右边代码所示,threadSafeMap包含一个indexers及一个Indices,还有一个items。在ThreadSafeStore中,Index存放的并不是obj,而是obj的key。而items中存入的就是key和obj。所以说,在ThreadSafeStore中的Index只负责obj的key的管理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| type threadSafeMap struct { lock sync.RWMutex items map[string]interface{} indexers Indexers indices Indices }
|
Add()
Add()先更新items map中的值,然后调用updateIndices()更新index。
1 2 3 4 5 6 7 8
| func (c *threadSafeMap) Add(key string, obj interface{}) { c.lock.Lock() defer c.lock.Unlock() oldObject := c.items[key] c.items[key] = obj c.updateIndices(oldObject, obj, key) }
|
Update()
同Add()。
1 2 3 4 5 6 7
| func (c *threadSafeMap) Update(key string, obj interface{}) { c.lock.Lock() defer c.lock.Unlock() oldObject := c.items[key] c.items[key] = obj c.updateIndices(oldObject, obj, key) }
|
Delete()
Delete()先把key从index中删除,然后再把key从items中删除。
1 2 3 4 5 6 7 8
| func (c *threadSafeMap) Delete(key string) { c.lock.Lock() defer c.lock.Unlock() if obj, exists := c.items[key]; exists { c.deleteFromIndices(obj, key) delete(c.items, key) } }
|
Get()
Get()依据key从items中获取item,然后返回。
1 2 3 4 5 6
| func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) { c.lock.RLock() defer c.lock.RUnlock() item, exists = c.items[key] return item, exists }
|
List()
List()返回items map中的所有value。
1 2 3 4 5 6 7 8 9
| func (c *threadSafeMap) List() []interface{} { c.lock.RLock() defer c.lock.RUnlock() list := make([]interface{}, 0, len(c.items)) for _, item := range c.items { list = append(list, item) } return list }
|
ListKeys()
ListKeys()返回items map中的所有key。
1 2 3 4 5 6 7 8 9 10 11
| func (c *threadSafeMap) ListKeys() []string { c.lock.RLock() defer c.lock.RUnlock() list := make([]string, 0, len(c.items)) for key := range c.items { list = append(list, key) } return list }
|
Replace()
Replace()重新构建items和index。
1 2 3 4 5 6 7 8 9 10 11
| func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) { c.lock.Lock() defer c.lock.Unlock() c.items = items c.indices = Indices{} for key, item := range c.items { c.updateIndices(nil, item, key) } }
|
Index()
Index()返回先使用indexName获取indexFunc,然后计算出obj的indexValue,再取出indexValue对应的keys,最后依据keys从items中获取对应的obj并返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) { c.lock.RLock() defer c.lock.RUnlock() indexFunc := c.indexers[indexName] if indexFunc == nil { return nil, fmt.Errorf("Index with name %s does not exist", indexName) } indexKeys, err := indexFunc(obj) if err != nil { return nil, err } index := c.indices[indexName] returnKeySet := sets.String{} for _, indexKey := range indexKeys { set := index[indexKey] for _, key := range set.List() { returnKeySet.Insert(key) } } list := make([]interface{}, 0, returnKeySet.Len()) for absoluteKey := range returnKeySet { list = append(list, c.items[absoluteKey]) } return list, nil }
|
ByIndex()
ByIndex()和Index()类似,不同的是其传入的是indexKey,无需计算。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) { c.lock.RLock() defer c.lock.RUnlock() indexFunc := c.indexers[indexName] if indexFunc == nil { return nil, fmt.Errorf("Index with name %s does not exist", indexName) } index := c.indices[indexName] set := index[indexKey] list := make([]interface{}, 0, set.Len()) for _, key := range set.List() { list = append(list, c.items[key]) } return list, nil }
|
ListIndexFuncValues()
ListIndexFuncValues()返回indexName对应的index map中的keys。可以理解为indexName分类函数对objs划分的类别。
1 2 3 4 5 6 7 8
| func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string { index := c.indices[indexName] names := make([]string, 0, len(index)) for key := range index { names = append(names, key) } return names }
|
GetIndexers()
GetIndexers()返回indexers。indexers保存indexName和indexFunc的关系。
1 2 3 4 5
| func (c *threadSafeMap) GetIndexers() Indexers { return c.indexers }
|
AddIndexers()
AddIndexers()把indexFunc添加到threadSafeMap中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error { c.lock.Lock() defer c.lock.Unlock() if len(c.items) > 0 { return fmt.Errorf("cannot add indexers to running index") } oldKeys := sets.StringKeySet(c.indexers) newKeys := sets.StringKeySet(newIndexers) if oldKeys.HasAny(newKeys.List()...) { return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys)) } for k, v := range newIndexers { c.indexers[k] = v } return nil }
|
updateIndices()
updateIndices()先把oldObj从索引中删除,然后把newObj的key添加到索引中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) error { if oldObj != nil { c.deleteFromIndices(oldObj, key) } for name, indexFunc := range c.indexers { indexValues, err := indexFunc(newObj) if err != nil { return err } index := c.indices[name] if index == nil { index = Index{} c.indices[name] = index } for _, indexValue := range indexValues { set := index[indexValue] if set == nil { set = sets.String{} index[indexValue] = set } set.Insert(key) } } return nil }
|
deleteFromIndices()
deleteFromIndices()把obj对应的key从index中删除。这里感觉有个小问题,index[indexValues]即使为空,indexValues这个key也会存在,当然,这不影响使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) error { for name, indexFunc := range c.indexers { indexValues, err := indexFunc(obj) if err != nil { return err } index := c.indices[name] for _, indexValue := range indexValues { if index != nil { set := index[indexValue] if set != nil { set.Delete(key) } } } } return nil }
|
cache
cache定义在/pkg/client/cache/store.go中。Cache是ThreadSafeStore再加上一个keyFunc。所有的obj会先使用keyFunc进行计算得到key,然后把obj和key存入到ThreadSafeStore中。
1 2 3 4 5 6 7 8
| type cache struct { cacheStorage ThreadSafeStore keyFunc KeyFunc }
|
cache的方法
cache的方法比较简单,基本上就是对ThreadSafeStore的封装。详见源代码。
再看Store和Indexer
Store的Indexers为空,所以obj直接存在ThreadSafeStore的items map中,其中key可以使用keyFunc计算。
Indexer的Indexers不为空,所以可以通过Index进行key的分类管理,并进一步地取得具有某一属性的objs。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| func NewStore(keyFunc KeyFunc) Store { return &cache{ cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}), keyFunc: keyFunc, } } func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer { return &cache{ cacheStorage: NewThreadSafeStore(indexers, Indices{}), keyFunc: keyFunc, } }
|
举例
结尾来看个例子(例子从test文件中摘出),看下index的具体用法。
testIndexFunc()返回的是pod labels中”foo”对应的值。
在GetIndexFuncValues()中,我们先构造index,其中keyFunc为MetaNamespaceKeyFunc,就是用namespace/name作为key;IndexFunc为testIndexFunc(),名为”testmodes”。然后生成pod1, pod2, pod3。其中pod1和pod2的label “foo”的值为”bar”,pod3的label “foo”的值为”biz”。把pod1, pod2和pod3加入到index中。现在index中”testmodes” indexFunc对应的有”bar”和”biz”两个indexValue,”bar”对应pod1和pod2的key,”biz”对应pod3的key,具体的pod值可以由index的ByIndex()获取。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package main import ( "fmt" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" ) func testIndexFunc(obj interface{}) ([]string, error) { pod := obj.(*api.Pod) return []string{pod.Labels["foo"]}, nil } func GetIndexFuncValues() { index := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"testmodes": testIndexFunc}) pod1 := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "one", Labels: map[string]string{"foo": "bar"}}} pod2 := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "two", Labels: map[string]string{"foo": "bar"}}} pod3 := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "three", Labels: map[string]string{"foo": "biz"}}} index.Add(pod1) index.Add(pod2) index.Add(pod3) keys := index.ListIndexFuncValues("testmodes") for _, key := range keys { fmt.Println("key:", key) items, _ := index.ByIndex("testmodes", key) for _, item := range items { fmt.Println("pod", item.(*api.Pod).ObjectMeta.Name) } } } func main() { GetIndexFuncValues() }
|
结果:
1 2 3 4 5
| key: bar pod one pod two key: biz pod three
|