本次文档将接着上次分析,介绍controller对store的管理。

(四) 管理store

controller中有stores字段,所有store都注册在该字段中。当然,这里所说的store是datastore,datastore中包含consul, zookeeper, etcd及boltdb这些底层的store。

初始化store

在controller的New()函数中,使用下面语句初始化stores:

1
2
3
4
//***初始化stores***//
if err := c.initStores(); err != nil {
return nil, err
}

controller的initStores()方法定义在/libnetwork/store.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//***初始化controller的stores***//
func (c *controller) initStores() error {
//***注册支持的kv store***//
registerKVStores()
c.Lock()
if c.cfg == nil {
c.Unlock()
return nil
}
scopeConfigs := c.cfg.Scopes
c.stores = nil
c.Unlock()
for scope, scfg := range scopeConfigs {
//***调用initScopedStore()***//
if err := c.initScopedStore(scope, scfg); err != nil {
return err
}
}
c.startWatch()
return nil
}

initStores()主要调用了两个函数:registerKVStores()和initScopedStore()。

先看registerKVStores(),定义在/libnetwork/store.go中:

1
2
3
4
5
6
func registerKVStores() {
consul.Register()
zookeeper.Register()
etcd.Register()
boltdb.Register()
}

可以看出,registerKVStores()调用了各store的Register()方法,我们以boltdb为例,其Register()定义在/libkv/store/boltdb/boltdb.go中:

1
2
3
4
// Register registers boltdb to libkv
func Register() {
libkv.AddStore(store.BOLTDB, New)
}

其又调用了libkv.AddStore(),定义在/libkv/libkv.go中:

1
2
3
4
5
6
initializers = make(map[store.Backend]Initialize)
// AddStore adds a new store backend to libkv
func AddStore(store store.Backend, init Initialize) {
initializers[store] = init
}

可以看出,libkv包中有一个全局变量initializers,而addStore就是把store和init函数加入到initializers map中,对于boltdb,其init函数就是boltdb.New()。

再来看initScopedStore(),定义在/libnetwork/store.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
//***初始化单个store,并放入stores***//
func (c *controller) initScopedStore(scope string, scfg *datastore.ScopeCfg) error {
//***scope: local***//
store, err := datastore.NewDataStore(scope, scfg)
if err != nil {
return err
}
c.Lock()
c.stores = append(c.stores, store)
c.Unlock()
return nil
}

initScopedStore()先调用datastore的NewDataStore()方法生成datastore,再把该datastore放入到controller的stores字段中。

datastore定义在/libnetwork/datastore/datastore.go中:

1
2
3
4
5
6
7
8
9
type datastore struct {
scope string
store store.Store
cache *cache
watchCh chan struct{}
active bool
sequential bool
sync.Mutex
}

可以看出,datastore在store的基础上增加了scope,cache这些概念。scope为local和global,其中boltdb为local,而etcd等为global;cache提供了内存缓存机制。

来看datastore的NewDataStore(),定义在/libnetwork/datastore/datastore.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// NewDataStore creates a new instance of LibKV data store
func NewDataStore(scope string, cfg *ScopeCfg) (DataStore, error) {
//***scope: local***//
//***cfg.Client.Provider: boltdb***//
//***cfg.Client.Address: /var/lib/docker/network/files/local-kv.db***//
if cfg == nil || cfg.Client.Provider == "" || cfg.Client.Address == "" {
c, ok := defaultScopes[scope]
if !ok || c.Client.Provider == "" || c.Client.Address == "" {
return nil, fmt.Errorf("unexpected scope %s without configuration passed", scope)
}
cfg = c
}
var cached bool
if scope == LocalScope {
cached = true
}
return newClient(scope, cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config, cached)
}

可以看出,NewDataStore()主要调用了newClient():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// newClient used to connect to KV Store
func newClient(scope string, kv string, addr string, config *store.Config, cached bool) (DataStore, error) {
if cached && scope != LocalScope {
return nil, fmt.Errorf("caching supported only for scope %s", LocalScope)
}
sequential := false
if scope == LocalScope {
sequential = true
}
if config == nil {
config = &store.Config{}
}
var addrs []string
if kv == string(store.BOLTDB) {
// Parse file path
addrs = strings.Split(addr, ",")
} else {
// Parse URI
parts := strings.SplitN(addr, "/", 2)
addrs = strings.Split(parts[0], ",")
// Add the custom prefix to the root chain
if len(parts) == 2 {
rootChain = append([]string{parts[1]}, defaultRootChain...)
}
}
//***创建store***//
store, err := libkv.NewStore(store.Backend(kv), addrs, config)
if err != nil {
return nil, err
}
//***把store封装成datastore***//
ds := &datastore{scope: scope, store: store, active: true, watchCh: make(chan struct{}), sequential: sequential}
if cached {
ds.cache = newCache(ds)
}
return ds, nil
}

而newClient()会调用libkv包的NewStore()方法生成store,然后封装成datastore返回。所以回到libkv包看NewStore(),定义在/libkv/libkv.go中:

1
2
3
4
5
6
7
8
// NewStore creates an instance of store
func NewStore(backend store.Backend, addrs []string, options *store.Config) (store.Store, error) {
if init, exists := initializers[backend]; exists {
return init(addrs, options)
}
return nil, fmt.Errorf("%s %s", store.ErrBackendNotSupported.Error(), supportedBackend)
}

libkv的NewStore()从initializers获取到某kv存储对应的init函数,并执行。以boltdb为例,其init函数为boltdb.new(),定义在/libkv/store/boltdb/boltdb.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// New opens a new BoltDB connection to the specified path and bucket
func New(endpoints []string, options *store.Config) (store.Store, error) {
var (
db *bolt.DB
err error
boltOptions *bolt.Options
)
if len(endpoints) > 1 {
return nil, ErrMultipleEndpointsUnsupported
}
if (options == nil) || (len(options.Bucket) == 0) {
return nil, ErrBoltBucketOptionMissing
}
dir, _ := filepath.Split(endpoints[0])
if err = os.MkdirAll(dir, 0750); err != nil {
return nil, err
}
if options.PersistConnection {
boltOptions = &bolt.Options{Timeout: options.ConnectionTimeout}
db, err = bolt.Open(endpoints[0], filePerm, boltOptions)
if err != nil {
return nil, err
}
}
b := &BoltDB{
client: db,
path: endpoints[0],
boltBucket: []byte(options.Bucket),
timeout: transientTimeout,
PersistConnection: options.PersistConnection,
}
return b, nil
}

所以,现在store都被封装在dataStore中,然后在注册在controller的stores中。

管理store

再来看下controller管理store所提供的方法。

controller::closeStores()

closeStores()可以关闭controller中所有store。

1
2
3
4
5
6
//***关闭controller中所有store***//
func (c *controller) closeStores() {
for _, store := range c.getStores() {
store.Close()
}
}

controller::getStore()

getStore()可以获取某scope类别下的store。注意,一般来说,local和global的scope下只各有一个store。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//***依据scope获取store***//
//***scope有local和global两种***//
func (c *controller) getStore(scope string) datastore.DataStore {
c.Lock()
defer c.Unlock()
for _, store := range c.stores {
if store.Scope() == scope {
return store
}
}
return nil
}

controller::getStores()

getStores()可以获取controller中所有store。

1
2
3
4
5
6
7
//***获取store列表***//
func (c *controller) getStores() []datastore.DataStore {
c.Lock()
defer c.Unlock()
return c.stores
}

管理store中的内容

接着来看controller管理store中内容所提供的方法。

controller::getNetworkFromStore()

getNetworkFromStore()会轮询controller中所有store,然后获取nid对应的network。其核心调用的是datastore的GetObject()。network的key形式为”docker/network/v1.0/network/808d5fe607329f5e925492c4ae319b89ab021ba463cb59b8e3c93bd35a56d261/“。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//***轮循stores,依据nid找到network***//
func (c *controller) getNetworkFromStore(nid string) (*network, error) {
for _, store := range c.getStores() {
n := &network{id: nid, ctrlr: c}
//***获取network***//
err := store.GetObject(datastore.Key(n.Key()...), n)
// Continue searching in the next store if the key is not found in this store
if err != nil {
if err != datastore.ErrKeyNotFound {
log.Debugf("could not find network %s: %v", nid, err)
}
continue
}
//***获取endpoint count***//
ec := &endpointCnt{n: n}
err = store.GetObject(datastore.Key(ec.Key()...), ec)
if err != nil && !n.inDelete {
return nil, fmt.Errorf("could not find endpoint count for network %s: %v", n.Name(), err)
}
n.epCnt = ec
n.scope = store.Scope()
return n, nil
}
return nil, fmt.Errorf("network %s not found", nid)
}

controller::getNetworksForScope()

getNetworksForScope()可以获取scope对应store下所有的network。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//***获取scope对应的store下的network列表***//
func (c *controller) getNetworksForScope(scope string) ([]*network, error) {
var nl []*network
store := c.getStore(scope)
if store == nil {
return nil, nil
}
//***获取network list***//
kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix),
&network{ctrlr: c})
if err != nil && err != datastore.ErrKeyNotFound {
return nil, fmt.Errorf("failed to get networks for scope %s: %v",
scope, err)
}
for _, kvo := range kvol {
n := kvo.(*network)
n.ctrlr = c
ec := &endpointCnt{n: n}
err = store.GetObject(datastore.Key(ec.Key()...), ec)
if err != nil && !n.inDelete {
log.Warnf("Could not find endpoint count key %s for network %s while listing: %v", datastore.Key(ec.Key()...), n.Name(), err)
continue
}
n.epCnt = ec
n.scope = scope
nl = append(nl, n)
}
return nl, nil
}

controller::getNetworksFromStore()

getNetworksFromStore()可以获取所有store中的network。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//***轮循stores,获取所有的network***//
func (c *controller) getNetworksFromStore() ([]*network, error) {
var nl []*network
for _, store := range c.getStores() {
kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix),
&network{ctrlr: c})
// Continue searching in the next store if no keys found in this store
if err != nil {
if err != datastore.ErrKeyNotFound {
log.Debugf("failed to get networks for scope %s: %v", store.Scope(), err)
}
continue
}
for _, kvo := range kvol {
n := kvo.(*network)
n.Lock()
n.ctrlr = c
n.Unlock()
ec := &endpointCnt{n: n}
err = store.GetObject(datastore.Key(ec.Key()...), ec)
if err != nil && !n.inDelete {
log.Warnf("could not find endpoint count key %s for network %s while listing: %v", datastore.Key(ec.Key()...), n.Name(), err)
continue
}
n.Lock()
n.epCnt = ec
n.scope = store.Scope()
n.Unlock()
nl = append(nl, n)
}
}
return nl, nil
}

controller::getEndpointFromStore()

getEndpointFromStore()可以依据eid获取endpoint。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//***轮循stores,依据eid找到endpoint***//
func (n *network) getEndpointFromStore(eid string) (*endpoint, error) {
var errors []string
for _, store := range n.ctrlr.getStores() {
ep := &endpoint{id: eid, network: n}
err := store.GetObject(datastore.Key(ep.Key()...), ep)
// Continue searching in the next store if the key is not found in this store
if err != nil {
if err != datastore.ErrKeyNotFound {
errors = append(errors, fmt.Sprintf("{%s:%v}, ", store.Scope(), err))
log.Debugf("could not find endpoint %s in %s: %v", eid, store.Scope(), err)
}
continue
}
return ep, nil
}
return nil, fmt.Errorf("could not find endpoint %s: %v", eid, errors)
}

controller::getEndpointsFromStore()

getEndpointsFromStore()可以获取所有store中的endpoint。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//***轮循stores,获取endpoint列表***//
func (n *network) getEndpointsFromStore() ([]*endpoint, error) {
var epl []*endpoint
tmp := endpoint{network: n}
for _, store := range n.getController().getStores() {
kvol, err := store.List(datastore.Key(tmp.KeyPrefix()...), &endpoint{network: n})
// Continue searching in the next store if no keys found in this store
if err != nil {
if err != datastore.ErrKeyNotFound {
log.Debugf("failed to get endpoints for network %s scope %s: %v",
n.Name(), store.Scope(), err)
}
continue
}
for _, kvo := range kvol {
ep := kvo.(*endpoint)
epl = append(epl, ep)
}
}
return epl, nil
}

controller::updateToStore()

updateToStore()为更新操作。其核心调用为datastore的PutObjectAtomic()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//***更新操作***//
func (c *controller) updateToStore(kvObject datastore.KVObject) error {
cs := c.getStore(kvObject.DataScope())
if cs == nil {
return fmt.Errorf("datastore for scope %q is not initialized ", kvObject.DataScope())
}
if err := cs.PutObjectAtomic(kvObject); err != nil {
if err == datastore.ErrKeyModified {
return err
}
return fmt.Errorf("failed to update store for object type %T: %v", kvObject, err)
}
return nil
}

controller::deleteFromStore()

deleteFromStore()可以从store中删除内容。其核心调用为datastore的DeleteObjectAtomic()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//***删除操作***//
func (c *controller) deleteFromStore(kvObject datastore.KVObject) error {
cs := c.getStore(kvObject.DataScope())
if cs == nil {
return fmt.Errorf("datastore for scope %q is not initialized ", kvObject.DataScope())
}
retry:
if err := cs.DeleteObjectAtomic(kvObject); err != nil {
if err == datastore.ErrKeyModified {
if err := cs.GetObject(datastore.Key(kvObject.Key()...), kvObject); err != nil {
return fmt.Errorf("could not update the kvobject to latest when trying to delete: %v", err)
}
goto retry
}
return err
}
return nil
}

Watch方法

kv数据库都会提供watch方法,来看下controller的watch方法提供的方法。

controller::startWatch()

startWatch()会启动go routine消费controller的watchCh和unWatchCh中的内容。其中watchCh中的内容由processEndpointCreate()处理;unWatchCh中的内容 由processEndpointDelete()处理。watchCh中的内容由watchSvcRecord()方法放入;unWatchCh中的内容由unWatchSvcRecord()放入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (c *controller) startWatch() {
if c.watchCh != nil {
return
}
c.watchCh = make(chan *endpoint)
c.unWatchCh = make(chan *endpoint)
c.nmap = make(map[string]*netWatch)
go c.watchLoop()
}
func (c *controller) watchLoop() {
for {
select {
case ep := <-c.watchCh:
c.processEndpointCreate(c.nmap, ep)
case ep := <-c.unWatchCh:
c.processEndpointDelete(c.nmap, ep)
}
}
}
func (c *controller) watchSvcRecord(ep *endpoint) {
c.watchCh <- ep
}
func (c *controller) unWatchSvcRecord(ep *endpoint) {
c.unWatchCh <- ep
}

感觉watch操作和docker service有关,现在对service还不熟,留着以后分析。