Store和Indexer

Store和Indexer 是Kubernetes用来存储obj的结构体,与reflector机制配合使用,可以把ETCD中的变化同步到Store和Indexer中,在代码中却可以直接从Store和Indexer获取相应的对象。所以说,Store和Indexer是ETCD中内容在内存中的一个缓存。

先来看下什么是Store。只要定义了Add(), Delete(), Update(), Get()等方法的结构体都可以称为Store。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Store is a generic object storage interface. Reflector knows how to watch a server
// and update a store. A generic store is provided, which allows Reflector to be used
// as a local caching system, and an LRU store, which allows Reflector to work like a
// queue of items yet to be processed.
//
// Store makes no assumptions about stored object identity; it is the responsibility
// of a Store implementation to provide a mechanism to correctly key objects and to
// define the contract for obtaining objects by some arbitrary key type.
type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
// Replace will delete the contents of the store, using instead the
// given list. Store takes ownership of the list, you should not reference
// it after calling this function.
Replace([]interface{}, string) error
Resync() error
}

Indexer是在Store的基础上增加了index func管理的功能。具体稍后分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Indexer is a storage interface that lets you list objects using multiple indexing functions
type Indexer interface {
Store
// Retrieve list of objects that match on the named indexing function
Index(indexName string, obj interface{}) ([]interface{}, error)
// ListIndexFuncValues returns the list of generated values of an Index func
ListIndexFuncValues(indexName string) []string
// ByIndex lists object that match on the named indexing function with the exact key
ByIndex(indexName, indexKey string) ([]interface{}, error)
// GetIndexer return the indexers
GetIndexers() Indexers
// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(newIndexers Indexers) error
}

Store和Indexer的生成函数定义在/pkg/client/cache/store.go中。可以看出,Store和Indexer都是一个cache,其本质都是一个threadSafeStore。不同的是Store的Indexers参数为空,而Indexer的Indexers参数有值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// NewStore returns a Store implemented simply with a map and a lock.
func NewStore(keyFunc KeyFunc) Store {
return &cache{
cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
keyFunc: keyFunc,
}
}
// NewIndexer returns an Indexer implemented simply with a map and a lock.
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}
}

Index, Indexers和Indices

在分析threadSafeStore之前,需要介绍Index, Indexers和Indices三个重量级的概念。

  • Index: 为key到obj set的map。可以通过key查找到对应的obj。Key使用keyFunc对obj计算得来。
  • Indexers: 为name到keyFunc的map。可以通过某名字,找到keyFunc。
  • Indices:为name到Index的map。可以通过某名字,找到Index。
1
2
3
4
5
6
7
8
9
10
11
// Index maps the indexed value to a set of keys in the store that match on that value
//***Index可以从indexValue找到keys***//
type Index map[string]sets.String
// Indexers maps a name to a IndexFunc
//***Indexers可以从name找到IndexFunc,并使用IndexFunc计算出indexValues***//
type Indexers map[string]IndexFunc
// Indices maps a name to an Index
//***Indices可以从name找到Index***//
type Indices map[string]Index

所以,可以通过一个比较人性化的名字,找到一个keyFunc及一个Index,通过keyFunc计算出obj的key,然后在Index中通过key找到obj。当然,不同的obj通过keyFunc可能计算出同一个key,这时候,这些obj都放在Index的同一key下。

现在有些明了这三个结构体的作用了,用keyFunc对很多obj进行分类存储,以方便存取。如keyFunc返回的是obj的namespace,那么我们就可以获取某namespace下的资源了。 所以keyFunc在系统中也称IndexFunc。

以上的很多命名是为了方便理解,现在为了和cache中的keyFunc进行区分,以后我们统一约定称为IndexFunc,其计算出来的值为indexValue。

ThreadSafeStore

ThreadSafeStore是一个interface, 定义在/pkg/client/cache/thread_safe_store.go中,是底层具体负责对象存储的结构体,其具体由threadSafeMap实现。先看来threadSafeMap的定义,如右边代码所示,threadSafeMap包含一个indexers及一个Indices,还有一个items。在ThreadSafeStore中,Index存放的并不是obj,而是obj的key。而items中存入的就是key和obj。所以说,在ThreadSafeStore中的Index只负责obj的key的管理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
//***在threadSafemap中,首先通过indexers[name]获取IndexFunc,然后使用IndexFunc计算obj的indexkey。***//
//***然后通过Indices[name]获取具体的Index;结合indexkey,就可以获取到具体obj的key,然后在items[key]获取obj的具体值。***//
//***IndexFunc是threadSafemap用***//
//***keyFunc是在cache中用,算出来的是存储在items中的key***//
lock sync.RWMutex
//***key和obj的map***//
items map[string]interface{}
// indexers maps a name to an IndexFunc
//***type Indexers map[string]IndexFunc***//
//***可以从indexers中找到对应的indexFunc***//
indexers Indexers
// indices maps a name to an Index
//***type Index map[string]sets.String***//
indices Indices
}

Add()

Add()先更新items map中的值,然后调用updateIndices()更新index。

1
2
3
4
5
6
7
8
func (c *threadSafeMap) Add(key string, obj interface{}) {
//***先把oldObject找到(有的话);然后更新该key对应的值。最后处理索引。***//
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key]
c.items[key] = obj
c.updateIndices(oldObject, obj, key)
}

Update()

同Add()。

1
2
3
4
5
6
7
func (c *threadSafeMap) Update(key string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key]
c.items[key] = obj
c.updateIndices(oldObject, obj, key)
}

Delete()

Delete()先把key从index中删除,然后再把key从items中删除。

1
2
3
4
5
6
7
8
func (c *threadSafeMap) Delete(key string) {
c.lock.Lock()
defer c.lock.Unlock()
if obj, exists := c.items[key]; exists {
c.deleteFromIndices(obj, key)
delete(c.items, key)
}
}

Get()

Get()依据key从items中获取item,然后返回。

1
2
3
4
5
6
func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
c.lock.RLock()
defer c.lock.RUnlock()
item, exists = c.items[key]
return item, exists
}

List()

List()返回items map中的所有value。

1
2
3
4
5
6
7
8
9
func (c *threadSafeMap) List() []interface{} {
c.lock.RLock()
defer c.lock.RUnlock()
list := make([]interface{}, 0, len(c.items))
for _, item := range c.items {
list = append(list, item)
}
return list
}

ListKeys()

ListKeys()返回items map中的所有key。

1
2
3
4
5
6
7
8
9
10
11
// ListKeys returns a list of all the keys of the objects currently
// in the threadSafeMap.
func (c *threadSafeMap) ListKeys() []string {
c.lock.RLock()
defer c.lock.RUnlock()
list := make([]string, 0, len(c.items))
for key := range c.items {
list = append(list, key)
}
return list
}

Replace()

Replace()重新构建items和index。

1
2
3
4
5
6
7
8
9
10
11
func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
c.lock.Lock()
defer c.lock.Unlock()
c.items = items
// rebuild any index
c.indices = Indices{}
for key, item := range c.items {
c.updateIndices(nil, item, key)
}
}

Index()

Index()返回先使用indexName获取indexFunc,然后计算出obj的indexValue,再取出indexValue对应的keys,最后依据keys从items中获取对应的obj并返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Index returns a list of items that match on the index function
// Index is thread-safe so long as you treat all items as immutable
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock()
//***从indexers找到indexFunc***//
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
//***使用indexFunc计算obj的indexKey***//
indexKeys, err := indexFunc(obj)
if err != nil {
return nil, err
}
index := c.indices[indexName]
// need to de-dupe the return list. Since multiple keys are allowed, this can happen.
returnKeySet := sets.String{}
for _, indexKey := range indexKeys {
//***通过indexKey从index中获取具体的key***//
set := index[indexKey]
for _, key := range set.List() {
returnKeySet.Insert(key)
}
}
list := make([]interface{}, 0, returnKeySet.Len())
for absoluteKey := range returnKeySet {
list = append(list, c.items[absoluteKey])
}
return list, nil
}

ByIndex()

ByIndex()和Index()类似,不同的是其传入的是indexKey,无需计算。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// ByIndex returns a list of items that match an exact value on the index function
func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock()
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
index := c.indices[indexName]
set := index[indexKey]
list := make([]interface{}, 0, set.Len())
for _, key := range set.List() {
list = append(list, c.items[key])
}
return list, nil
}

ListIndexFuncValues()

ListIndexFuncValues()返回indexName对应的index map中的keys。可以理解为indexName分类函数对objs划分的类别。

1
2
3
4
5
6
7
8
func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
index := c.indices[indexName]
names := make([]string, 0, len(index))
for key := range index {
names = append(names, key)
}
return names
}

GetIndexers()

GetIndexers()返回indexers。indexers保存indexName和indexFunc的关系。

1
2
3
4
5
//***获取cache中所有indexers***//
//***Indexers的类型为map[string]IndexFunc***//
func (c *threadSafeMap) GetIndexers() Indexers {
return c.indexers
}

AddIndexers()

AddIndexers()把indexFunc添加到threadSafeMap中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
c.lock.Lock()
defer c.lock.Unlock()
if len(c.items) > 0 {
return fmt.Errorf("cannot add indexers to running index")
}
oldKeys := sets.StringKeySet(c.indexers)
newKeys := sets.StringKeySet(newIndexers)
if oldKeys.HasAny(newKeys.List()...) {
return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys))
}
for k, v := range newIndexers {
c.indexers[k] = v
}
return nil
}

updateIndices()

updateIndices()先把oldObj从索引中删除,然后把newObj的key添加到索引中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
// updateIndices must be called from a function that already has a lock on the cache
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) error {
// if we got an old object, we need to remove it before we add it again
//***函数要先把oldObj先删除,然后再把newObj添加,因为每个obj都会依据indexFunc()计算出索引,每个obj的索引都不一样***//
if oldObj != nil {
c.deleteFromIndices(oldObj, key)
}
//***indexers存储了name和indexFunc的关系***//
//***在NewStore()中,NewThreadSafeStore(Indexers{}, Indices{}),Indexers{}未进行初始化、所以在store相关操作中,并未执行该循环。***//
//***因为Store的实现cache,已经包含了KeyFunc***//
for name, indexFunc := range c.indexers {
indexValues, err := indexFunc(newObj)
if err != nil {
return err
}
//***indices存储了name和index的关系***//
index := c.indices[name]
if index == nil {
index = Index{}
c.indices[name] = index
}
//***index存储了indexValue和keys的关系***//
for _, indexValue := range indexValues {
set := index[indexValue]
if set == nil {
set = sets.String{}
index[indexValue] = set
}
set.Insert(key)
}
}
return nil
}

deleteFromIndices()

deleteFromIndices()把obj对应的key从index中删除。这里感觉有个小问题,index[indexValues]即使为空,indexValues这个key也会存在,当然,这不影响使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// deleteFromIndices removes the object from each of the managed indexes
// it is intended to be called from a function that already has a lock on the cache
func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) error {
for name, indexFunc := range c.indexers {
indexValues, err := indexFunc(obj)
if err != nil {
return err
}
index := c.indices[name]
for _, indexValue := range indexValues {
if index != nil {
set := index[indexValue]
if set != nil {
set.Delete(key)
}
}
}
}
return nil
}

cache

cache定义在/pkg/client/cache/store.go中。Cache是ThreadSafeStore再加上一个keyFunc。所有的obj会先使用keyFunc进行计算得到key,然后把obj和key存入到ThreadSafeStore中。

1
2
3
4
5
6
7
8
//***cache是的ThreadSafeStore的封装;cache能使用KeyFunc计算obj的key***//
type cache struct {
// cacheStorage bears the burden of thread safety for the cache
cacheStorage ThreadSafeStore
// keyFunc is used to make the key for objects stored in and retrieved from items, and
// should be deterministic.
keyFunc KeyFunc
}

cache的方法

cache的方法比较简单,基本上就是对ThreadSafeStore的封装。详见源代码。

再看Store和Indexer

Store的Indexers为空,所以obj直接存在ThreadSafeStore的items map中,其中key可以使用keyFunc计算。

Indexer的Indexers不为空,所以可以通过Index进行key的分类管理,并进一步地取得具有某一属性的objs。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// NewStore returns a Store implemented simply with a map and a lock.
func NewStore(keyFunc KeyFunc) Store {
return &cache{
cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
keyFunc: keyFunc,
}
}
// NewIndexer returns an Indexer implemented simply with a map and a lock.
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}
}

举例

结尾来看个例子(例子从test文件中摘出),看下index的具体用法。

testIndexFunc()返回的是pod labels中”foo”对应的值。

在GetIndexFuncValues()中,我们先构造index,其中keyFunc为MetaNamespaceKeyFunc,就是用namespace/name作为key;IndexFunc为testIndexFunc(),名为”testmodes”。然后生成pod1, pod2, pod3。其中pod1和pod2的label “foo”的值为”bar”,pod3的label “foo”的值为”biz”。把pod1, pod2和pod3加入到index中。现在index中”testmodes” indexFunc对应的有”bar”和”biz”两个indexValue,”bar”对应pod1和pod2的key,”biz”对应pod3的key,具体的pod值可以由index的ByIndex()获取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main
import (
"fmt"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
)
func testIndexFunc(obj interface{}) ([]string, error) {
pod := obj.(*api.Pod)
return []string{pod.Labels["foo"]}, nil
}
func GetIndexFuncValues() {
index := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"testmodes": testIndexFunc})
pod1 := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "one", Labels: map[string]string{"foo": "bar"}}}
pod2 := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "two", Labels: map[string]string{"foo": "bar"}}}
pod3 := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "three", Labels: map[string]string{"foo": "biz"}}}
index.Add(pod1)
index.Add(pod2)
index.Add(pod3)
keys := index.ListIndexFuncValues("testmodes")
for _, key := range keys {
fmt.Println("key:", key)
items, _ := index.ByIndex("testmodes", key)
for _, item := range items {
fmt.Println("pod", item.(*api.Pod).ObjectMeta.Name)
}
}
}
func main() {
GetIndexFuncValues()
}

结果:

1
2
3
4
5
key: bar
pod one
pod two
key: biz
pod three