本次文档将接着上次分析,介绍controller对store的管理。
(四) 管理store
controller中有stores字段,所有store都注册在该字段中。当然,这里所说的store是datastore,datastore中包含consul, zookeeper, etcd及boltdb这些底层的store。
初始化store
在controller的New()函数中,使用下面语句初始化stores:
1 2 3 4
| if err := c.initStores(); err != nil { return nil, err }
|
controller的initStores()方法定义在/libnetwork/store.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| func (c *controller) initStores() error { registerKVStores() c.Lock() if c.cfg == nil { c.Unlock() return nil } scopeConfigs := c.cfg.Scopes c.stores = nil c.Unlock() for scope, scfg := range scopeConfigs { if err := c.initScopedStore(scope, scfg); err != nil { return err } } c.startWatch() return nil }
|
initStores()主要调用了两个函数:registerKVStores()和initScopedStore()。
先看registerKVStores(),定义在/libnetwork/store.go中:
1 2 3 4 5 6
| func registerKVStores() { consul.Register() zookeeper.Register() etcd.Register() boltdb.Register() }
|
可以看出,registerKVStores()调用了各store的Register()方法,我们以boltdb为例,其Register()定义在/libkv/store/boltdb/boltdb.go中:
1 2 3 4
| func Register() { libkv.AddStore(store.BOLTDB, New) }
|
其又调用了libkv.AddStore(),定义在/libkv/libkv.go中:
1 2 3 4 5 6
| initializers = make(map[store.Backend]Initialize) func AddStore(store store.Backend, init Initialize) { initializers[store] = init }
|
可以看出,libkv包中有一个全局变量initializers,而addStore就是把store和init函数加入到initializers map中,对于boltdb,其init函数就是boltdb.New()。
再来看initScopedStore(),定义在/libnetwork/store.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13
| func (c *controller) initScopedStore(scope string, scfg *datastore.ScopeCfg) error { store, err := datastore.NewDataStore(scope, scfg) if err != nil { return err } c.Lock() c.stores = append(c.stores, store) c.Unlock() return nil }
|
initScopedStore()先调用datastore的NewDataStore()方法生成datastore,再把该datastore放入到controller的stores字段中。
datastore定义在/libnetwork/datastore/datastore.go中:
1 2 3 4 5 6 7 8 9
| type datastore struct { scope string store store.Store cache *cache watchCh chan struct{} active bool sequential bool sync.Mutex }
|
可以看出,datastore在store的基础上增加了scope,cache这些概念。scope为local和global,其中boltdb为local,而etcd等为global;cache提供了内存缓存机制。
来看datastore的NewDataStore(),定义在/libnetwork/datastore/datastore.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| func NewDataStore(scope string, cfg *ScopeCfg) (DataStore, error) { if cfg == nil || cfg.Client.Provider == "" || cfg.Client.Address == "" { c, ok := defaultScopes[scope] if !ok || c.Client.Provider == "" || c.Client.Address == "" { return nil, fmt.Errorf("unexpected scope %s without configuration passed", scope) } cfg = c } var cached bool if scope == LocalScope { cached = true } return newClient(scope, cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config, cached) }
|
可以看出,NewDataStore()主要调用了newClient():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| func newClient(scope string, kv string, addr string, config *store.Config, cached bool) (DataStore, error) { if cached && scope != LocalScope { return nil, fmt.Errorf("caching supported only for scope %s", LocalScope) } sequential := false if scope == LocalScope { sequential = true } if config == nil { config = &store.Config{} } var addrs []string if kv == string(store.BOLTDB) { addrs = strings.Split(addr, ",") } else { parts := strings.SplitN(addr, "/", 2) addrs = strings.Split(parts[0], ",") if len(parts) == 2 { rootChain = append([]string{parts[1]}, defaultRootChain...) } } store, err := libkv.NewStore(store.Backend(kv), addrs, config) if err != nil { return nil, err } ds := &datastore{scope: scope, store: store, active: true, watchCh: make(chan struct{}), sequential: sequential} if cached { ds.cache = newCache(ds) } return ds, nil }
|
而newClient()会调用libkv包的NewStore()方法生成store,然后封装成datastore返回。所以回到libkv包看NewStore(),定义在/libkv/libkv.go中:
1 2 3 4 5 6 7 8
| func NewStore(backend store.Backend, addrs []string, options *store.Config) (store.Store, error) { if init, exists := initializers[backend]; exists { return init(addrs, options) } return nil, fmt.Errorf("%s %s", store.ErrBackendNotSupported.Error(), supportedBackend) }
|
libkv的NewStore()从initializers获取到某kv存储对应的init函数,并执行。以boltdb为例,其init函数为boltdb.new(),定义在/libkv/store/boltdb/boltdb.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| func New(endpoints []string, options *store.Config) (store.Store, error) { var ( db *bolt.DB err error boltOptions *bolt.Options ) if len(endpoints) > 1 { return nil, ErrMultipleEndpointsUnsupported } if (options == nil) || (len(options.Bucket) == 0) { return nil, ErrBoltBucketOptionMissing } dir, _ := filepath.Split(endpoints[0]) if err = os.MkdirAll(dir, 0750); err != nil { return nil, err } if options.PersistConnection { boltOptions = &bolt.Options{Timeout: options.ConnectionTimeout} db, err = bolt.Open(endpoints[0], filePerm, boltOptions) if err != nil { return nil, err } } b := &BoltDB{ client: db, path: endpoints[0], boltBucket: []byte(options.Bucket), timeout: transientTimeout, PersistConnection: options.PersistConnection, } return b, nil }
|
所以,现在store都被封装在dataStore中,然后在注册在controller的stores中。
管理store
再来看下controller管理store所提供的方法。
controller::closeStores()
closeStores()可以关闭controller中所有store。
1 2 3 4 5 6
| func (c *controller) closeStores() { for _, store := range c.getStores() { store.Close() } }
|
controller::getStore()
getStore()可以获取某scope类别下的store。注意,一般来说,local和global的scope下只各有一个store。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func (c *controller) getStore(scope string) datastore.DataStore { c.Lock() defer c.Unlock() for _, store := range c.stores { if store.Scope() == scope { return store } } return nil }
|
controller::getStores()
getStores()可以获取controller中所有store。
1 2 3 4 5 6 7
| func (c *controller) getStores() []datastore.DataStore { c.Lock() defer c.Unlock() return c.stores }
|
管理store中的内容
接着来看controller管理store中内容所提供的方法。
controller::getNetworkFromStore()
getNetworkFromStore()会轮询controller中所有store,然后获取nid对应的network。其核心调用的是datastore的GetObject()。network的key形式为”docker/network/v1.0/network/808d5fe607329f5e925492c4ae319b89ab021ba463cb59b8e3c93bd35a56d261/“。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| func (c *controller) getNetworkFromStore(nid string) (*network, error) { for _, store := range c.getStores() { n := &network{id: nid, ctrlr: c} err := store.GetObject(datastore.Key(n.Key()...), n) if err != nil { if err != datastore.ErrKeyNotFound { log.Debugf("could not find network %s: %v", nid, err) } continue } ec := &endpointCnt{n: n} err = store.GetObject(datastore.Key(ec.Key()...), ec) if err != nil && !n.inDelete { return nil, fmt.Errorf("could not find endpoint count for network %s: %v", n.Name(), err) } n.epCnt = ec n.scope = store.Scope() return n, nil } return nil, fmt.Errorf("network %s not found", nid) }
|
controller::getNetworksForScope()
getNetworksForScope()可以获取scope对应store下所有的network。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| func (c *controller) getNetworksForScope(scope string) ([]*network, error) { var nl []*network store := c.getStore(scope) if store == nil { return nil, nil } kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix), &network{ctrlr: c}) if err != nil && err != datastore.ErrKeyNotFound { return nil, fmt.Errorf("failed to get networks for scope %s: %v", scope, err) } for _, kvo := range kvol { n := kvo.(*network) n.ctrlr = c ec := &endpointCnt{n: n} err = store.GetObject(datastore.Key(ec.Key()...), ec) if err != nil && !n.inDelete { log.Warnf("Could not find endpoint count key %s for network %s while listing: %v", datastore.Key(ec.Key()...), n.Name(), err) continue } n.epCnt = ec n.scope = scope nl = append(nl, n) } return nl, nil }
|
controller::getNetworksFromStore()
getNetworksFromStore()可以获取所有store中的network。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| func (c *controller) getNetworksFromStore() ([]*network, error) { var nl []*network for _, store := range c.getStores() { kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix), &network{ctrlr: c}) if err != nil { if err != datastore.ErrKeyNotFound { log.Debugf("failed to get networks for scope %s: %v", store.Scope(), err) } continue } for _, kvo := range kvol { n := kvo.(*network) n.Lock() n.ctrlr = c n.Unlock() ec := &endpointCnt{n: n} err = store.GetObject(datastore.Key(ec.Key()...), ec) if err != nil && !n.inDelete { log.Warnf("could not find endpoint count key %s for network %s while listing: %v", datastore.Key(ec.Key()...), n.Name(), err) continue } n.Lock() n.epCnt = ec n.scope = store.Scope() n.Unlock() nl = append(nl, n) } } return nl, nil }
|
controller::getEndpointFromStore()
getEndpointFromStore()可以依据eid获取endpoint。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| func (n *network) getEndpointFromStore(eid string) (*endpoint, error) { var errors []string for _, store := range n.ctrlr.getStores() { ep := &endpoint{id: eid, network: n} err := store.GetObject(datastore.Key(ep.Key()...), ep) if err != nil { if err != datastore.ErrKeyNotFound { errors = append(errors, fmt.Sprintf("{%s:%v}, ", store.Scope(), err)) log.Debugf("could not find endpoint %s in %s: %v", eid, store.Scope(), err) } continue } return ep, nil } return nil, fmt.Errorf("could not find endpoint %s: %v", eid, errors) }
|
controller::getEndpointsFromStore()
getEndpointsFromStore()可以获取所有store中的endpoint。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| func (n *network) getEndpointsFromStore() ([]*endpoint, error) { var epl []*endpoint tmp := endpoint{network: n} for _, store := range n.getController().getStores() { kvol, err := store.List(datastore.Key(tmp.KeyPrefix()...), &endpoint{network: n}) if err != nil { if err != datastore.ErrKeyNotFound { log.Debugf("failed to get endpoints for network %s scope %s: %v", n.Name(), store.Scope(), err) } continue } for _, kvo := range kvol { ep := kvo.(*endpoint) epl = append(epl, ep) } } return epl, nil }
|
controller::updateToStore()
updateToStore()为更新操作。其核心调用为datastore的PutObjectAtomic()方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func (c *controller) updateToStore(kvObject datastore.KVObject) error { cs := c.getStore(kvObject.DataScope()) if cs == nil { return fmt.Errorf("datastore for scope %q is not initialized ", kvObject.DataScope()) } if err := cs.PutObjectAtomic(kvObject); err != nil { if err == datastore.ErrKeyModified { return err } return fmt.Errorf("failed to update store for object type %T: %v", kvObject, err) } return nil }
|
controller::deleteFromStore()
deleteFromStore()可以从store中删除内容。其核心调用为datastore的DeleteObjectAtomic()。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| func (c *controller) deleteFromStore(kvObject datastore.KVObject) error { cs := c.getStore(kvObject.DataScope()) if cs == nil { return fmt.Errorf("datastore for scope %q is not initialized ", kvObject.DataScope()) } retry: if err := cs.DeleteObjectAtomic(kvObject); err != nil { if err == datastore.ErrKeyModified { if err := cs.GetObject(datastore.Key(kvObject.Key()...), kvObject); err != nil { return fmt.Errorf("could not update the kvobject to latest when trying to delete: %v", err) } goto retry } return err } return nil }
|
Watch方法
kv数据库都会提供watch方法,来看下controller的watch方法提供的方法。
controller::startWatch()
startWatch()会启动go routine消费controller的watchCh和unWatchCh中的内容。其中watchCh中的内容由processEndpointCreate()处理;unWatchCh中的内容 由processEndpointDelete()处理。watchCh中的内容由watchSvcRecord()方法放入;unWatchCh中的内容由unWatchSvcRecord()放入。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| func (c *controller) startWatch() { if c.watchCh != nil { return } c.watchCh = make(chan *endpoint) c.unWatchCh = make(chan *endpoint) c.nmap = make(map[string]*netWatch) go c.watchLoop() } func (c *controller) watchLoop() { for { select { case ep := <-c.watchCh: c.processEndpointCreate(c.nmap, ep) case ep := <-c.unWatchCh: c.processEndpointDelete(c.nmap, ep) } } } func (c *controller) watchSvcRecord(ep *endpoint) { c.watchCh <- ep } func (c *controller) unWatchSvcRecord(ep *endpoint) { c.unWatchCh <- ep }
|
感觉watch操作和docker service有关,现在对service还不熟,留着以后分析。