本次文档将接着上次分析,介绍controller对store的管理。
(四) 管理store
controller中有stores字段,所有store都注册在该字段中。当然,这里所说的store是datastore,datastore中包含consul, zookeeper, etcd及boltdb这些底层的store。
初始化store
在controller的New()函数中,使用下面语句初始化stores:
1 2 3 4 
  | if err := c.initStores(); err != nil { 	return nil, err } 
  | 
controller的initStores()方法定义在/libnetwork/store.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 
  | func (c *controller) initStores() error { 	 	registerKVStores() 	c.Lock() 	if c.cfg == nil { 		c.Unlock() 		return nil 	} 	scopeConfigs := c.cfg.Scopes 	c.stores = nil 	c.Unlock() 	for scope, scfg := range scopeConfigs { 		 		if err := c.initScopedStore(scope, scfg); err != nil { 			return err 		} 	} 	c.startWatch() 	return nil } 
  | 
initStores()主要调用了两个函数:registerKVStores()和initScopedStore()。
先看registerKVStores(),定义在/libnetwork/store.go中:
1 2 3 4 5 6 
  | func registerKVStores() { 	consul.Register() 	zookeeper.Register() 	etcd.Register() 	boltdb.Register() } 
  | 
可以看出,registerKVStores()调用了各store的Register()方法,我们以boltdb为例,其Register()定义在/libkv/store/boltdb/boltdb.go中:
1 2 3 4 
  | func Register() { 	libkv.AddStore(store.BOLTDB, New) } 
  | 
其又调用了libkv.AddStore(),定义在/libkv/libkv.go中:
1 2 3 4 5 6 
  | initializers = make(map[store.Backend]Initialize) func AddStore(store store.Backend, init Initialize) { 	initializers[store] = init } 
  | 
可以看出,libkv包中有一个全局变量initializers,而addStore就是把store和init函数加入到initializers map中,对于boltdb,其init函数就是boltdb.New()。
再来看initScopedStore(),定义在/libnetwork/store.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 
  | func (c *controller) initScopedStore(scope string, scfg *datastore.ScopeCfg) error { 	 	store, err := datastore.NewDataStore(scope, scfg) 	if err != nil { 		return err 	} 	c.Lock() 	c.stores = append(c.stores, store) 	c.Unlock() 	return nil } 
  | 
initScopedStore()先调用datastore的NewDataStore()方法生成datastore,再把该datastore放入到controller的stores字段中。
datastore定义在/libnetwork/datastore/datastore.go中:
1 2 3 4 5 6 7 8 9 
  | type datastore struct { 	scope      string 	store      store.Store 	cache      *cache 	watchCh    chan struct{} 	active     bool 	sequential bool 	sync.Mutex } 
  | 
可以看出,datastore在store的基础上增加了scope,cache这些概念。scope为local和global,其中boltdb为local,而etcd等为global;cache提供了内存缓存机制。
来看datastore的NewDataStore(),定义在/libnetwork/datastore/datastore.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 
  | func NewDataStore(scope string, cfg *ScopeCfg) (DataStore, error) { 	 	 	 	if cfg == nil || cfg.Client.Provider == "" || cfg.Client.Address == "" { 		c, ok := defaultScopes[scope] 		if !ok || c.Client.Provider == "" || c.Client.Address == "" { 			return nil, fmt.Errorf("unexpected scope %s without configuration passed", scope) 		} 		cfg = c 	} 	var cached bool 	if scope == LocalScope { 		cached = true 	} 	return newClient(scope, cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config, cached) } 
  | 
可以看出,NewDataStore()主要调用了newClient():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 
  | func newClient(scope string, kv string, addr string, config *store.Config, cached bool) (DataStore, error) { 	if cached && scope != LocalScope { 		return nil, fmt.Errorf("caching supported only for scope %s", LocalScope) 	} 	sequential := false 	if scope == LocalScope { 		sequential = true 	} 	if config == nil { 		config = &store.Config{} 	} 	var addrs []string 	if kv == string(store.BOLTDB) { 		 		addrs = strings.Split(addr, ",") 	} else { 		 		parts := strings.SplitN(addr, "/", 2) 		addrs = strings.Split(parts[0], ",") 		 		if len(parts) == 2 { 			rootChain = append([]string{parts[1]}, defaultRootChain...) 		} 	} 	 	store, err := libkv.NewStore(store.Backend(kv), addrs, config) 	if err != nil { 		return nil, err 	} 	 	ds := &datastore{scope: scope, store: store, active: true, watchCh: make(chan struct{}), sequential: sequential} 	if cached { 		ds.cache = newCache(ds) 	} 	return ds, nil } 
  | 
而newClient()会调用libkv包的NewStore()方法生成store,然后封装成datastore返回。所以回到libkv包看NewStore(),定义在/libkv/libkv.go中:
1 2 3 4 5 6 7 8 
  | func NewStore(backend store.Backend, addrs []string, options *store.Config) (store.Store, error) { 	if init, exists := initializers[backend]; exists { 		return init(addrs, options) 	} 	return nil, fmt.Errorf("%s %s", store.ErrBackendNotSupported.Error(), supportedBackend) } 
  | 
libkv的NewStore()从initializers获取到某kv存储对应的init函数,并执行。以boltdb为例,其init函数为boltdb.new(),定义在/libkv/store/boltdb/boltdb.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 
  | func New(endpoints []string, options *store.Config) (store.Store, error) { 	var ( 		db          *bolt.DB 		err         error 		boltOptions *bolt.Options 	) 	if len(endpoints) > 1 { 		return nil, ErrMultipleEndpointsUnsupported 	} 	if (options == nil) || (len(options.Bucket) == 0) { 		return nil, ErrBoltBucketOptionMissing 	} 	dir, _ := filepath.Split(endpoints[0]) 	if err = os.MkdirAll(dir, 0750); err != nil { 		return nil, err 	} 	if options.PersistConnection { 		boltOptions = &bolt.Options{Timeout: options.ConnectionTimeout} 		db, err = bolt.Open(endpoints[0], filePerm, boltOptions) 		if err != nil { 			return nil, err 		} 	} 	b := &BoltDB{ 		client:            db, 		path:              endpoints[0], 		boltBucket:        []byte(options.Bucket), 		timeout:           transientTimeout, 		PersistConnection: options.PersistConnection, 	} 	return b, nil } 
  | 
所以,现在store都被封装在dataStore中,然后在注册在controller的stores中。
管理store
再来看下controller管理store所提供的方法。
controller::closeStores()
closeStores()可以关闭controller中所有store。
1 2 3 4 5 6 
  | func (c *controller) closeStores() { 	for _, store := range c.getStores() { 		store.Close() 	} } 
  | 
controller::getStore()
getStore()可以获取某scope类别下的store。注意,一般来说,local和global的scope下只各有一个store。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 
  | func (c *controller) getStore(scope string) datastore.DataStore { 	c.Lock() 	defer c.Unlock() 	for _, store := range c.stores { 		if store.Scope() == scope { 			return store 		} 	} 	return nil } 
  | 
controller::getStores()
getStores()可以获取controller中所有store。
1 2 3 4 5 6 7 
  | func (c *controller) getStores() []datastore.DataStore { 	c.Lock() 	defer c.Unlock() 	return c.stores } 
  | 
管理store中的内容
接着来看controller管理store中内容所提供的方法。
controller::getNetworkFromStore()
getNetworkFromStore()会轮询controller中所有store,然后获取nid对应的network。其核心调用的是datastore的GetObject()。network的key形式为”docker/network/v1.0/network/808d5fe607329f5e925492c4ae319b89ab021ba463cb59b8e3c93bd35a56d261/“。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 
  | func (c *controller) getNetworkFromStore(nid string) (*network, error) { 	for _, store := range c.getStores() { 		n := &network{id: nid, ctrlr: c} 		 		err := store.GetObject(datastore.Key(n.Key()...), n) 		 		if err != nil { 			if err != datastore.ErrKeyNotFound { 				log.Debugf("could not find network %s: %v", nid, err) 			} 			continue 		} 		 		ec := &endpointCnt{n: n} 		err = store.GetObject(datastore.Key(ec.Key()...), ec) 		if err != nil && !n.inDelete { 			return nil, fmt.Errorf("could not find endpoint count for network %s: %v", n.Name(), err) 		} 		n.epCnt = ec 		n.scope = store.Scope() 		return n, nil 	} 	return nil, fmt.Errorf("network %s not found", nid) } 
  | 
controller::getNetworksForScope()
getNetworksForScope()可以获取scope对应store下所有的network。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 
  | func (c *controller) getNetworksForScope(scope string) ([]*network, error) { 	var nl []*network 	store := c.getStore(scope) 	if store == nil { 		return nil, nil 	} 	 	kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix), 		&network{ctrlr: c}) 	if err != nil && err != datastore.ErrKeyNotFound { 		return nil, fmt.Errorf("failed to get networks for scope %s: %v", 			scope, err) 	} 	for _, kvo := range kvol { 		n := kvo.(*network) 		n.ctrlr = c 		ec := &endpointCnt{n: n} 		err = store.GetObject(datastore.Key(ec.Key()...), ec) 		if err != nil && !n.inDelete { 			log.Warnf("Could not find endpoint count key %s for network %s while listing: %v", datastore.Key(ec.Key()...), n.Name(), err) 			continue 		} 		n.epCnt = ec 		n.scope = scope 		nl = append(nl, n) 	} 	return nl, nil } 
  | 
controller::getNetworksFromStore()
getNetworksFromStore()可以获取所有store中的network。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 
  | func (c *controller) getNetworksFromStore() ([]*network, error) { 	var nl []*network 	for _, store := range c.getStores() { 		kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix), 			&network{ctrlr: c}) 		 		if err != nil { 			if err != datastore.ErrKeyNotFound { 				log.Debugf("failed to get networks for scope %s: %v", store.Scope(), err) 			} 			continue 		} 		for _, kvo := range kvol { 			n := kvo.(*network) 			n.Lock() 			n.ctrlr = c 			n.Unlock() 			ec := &endpointCnt{n: n} 			err = store.GetObject(datastore.Key(ec.Key()...), ec) 			if err != nil && !n.inDelete { 				log.Warnf("could not find endpoint count key %s for network %s while listing: %v", datastore.Key(ec.Key()...), n.Name(), err) 				continue 			} 			n.Lock() 			n.epCnt = ec 			n.scope = store.Scope() 			n.Unlock() 			nl = append(nl, n) 		} 	} 	return nl, nil } 
  | 
controller::getEndpointFromStore()
getEndpointFromStore()可以依据eid获取endpoint。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 
  | func (n *network) getEndpointFromStore(eid string) (*endpoint, error) { 	var errors []string 	for _, store := range n.ctrlr.getStores() { 		ep := &endpoint{id: eid, network: n} 		err := store.GetObject(datastore.Key(ep.Key()...), ep) 		 		if err != nil { 			if err != datastore.ErrKeyNotFound { 				errors = append(errors, fmt.Sprintf("{%s:%v}, ", store.Scope(), err)) 				log.Debugf("could not find endpoint %s in %s: %v", eid, store.Scope(), err) 			} 			continue 		} 		return ep, nil 	} 	return nil, fmt.Errorf("could not find endpoint %s: %v", eid, errors) } 
  | 
controller::getEndpointsFromStore()
getEndpointsFromStore()可以获取所有store中的endpoint。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 
  | func (n *network) getEndpointsFromStore() ([]*endpoint, error) { 	var epl []*endpoint 	tmp := endpoint{network: n} 	for _, store := range n.getController().getStores() { 		kvol, err := store.List(datastore.Key(tmp.KeyPrefix()...), &endpoint{network: n}) 		 		if err != nil { 			if err != datastore.ErrKeyNotFound { 				log.Debugf("failed to get endpoints for network %s scope %s: %v", 					n.Name(), store.Scope(), err) 			} 			continue 		} 		for _, kvo := range kvol { 			ep := kvo.(*endpoint) 			epl = append(epl, ep) 		} 	} 	return epl, nil } 
  | 
controller::updateToStore()
updateToStore()为更新操作。其核心调用为datastore的PutObjectAtomic()方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 
  | func (c *controller) updateToStore(kvObject datastore.KVObject) error { 	cs := c.getStore(kvObject.DataScope()) 	if cs == nil { 		return fmt.Errorf("datastore for scope %q is not initialized ", kvObject.DataScope()) 	} 	if err := cs.PutObjectAtomic(kvObject); err != nil { 		if err == datastore.ErrKeyModified { 			return err 		} 		return fmt.Errorf("failed to update store for object type %T: %v", kvObject, err) 	} 	return nil } 
  | 
controller::deleteFromStore()
deleteFromStore()可以从store中删除内容。其核心调用为datastore的DeleteObjectAtomic()。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 
  | func (c *controller) deleteFromStore(kvObject datastore.KVObject) error { 	cs := c.getStore(kvObject.DataScope()) 	if cs == nil { 		return fmt.Errorf("datastore for scope %q is not initialized ", kvObject.DataScope()) 	} retry: 	if err := cs.DeleteObjectAtomic(kvObject); err != nil { 		if err == datastore.ErrKeyModified { 			if err := cs.GetObject(datastore.Key(kvObject.Key()...), kvObject); err != nil { 				return fmt.Errorf("could not update the kvobject to latest when trying to delete: %v", err) 			} 			goto retry 		} 		return err 	} 	return nil } 
  | 
Watch方法
kv数据库都会提供watch方法,来看下controller的watch方法提供的方法。
controller::startWatch()
startWatch()会启动go routine消费controller的watchCh和unWatchCh中的内容。其中watchCh中的内容由processEndpointCreate()处理;unWatchCh中的内容 由processEndpointDelete()处理。watchCh中的内容由watchSvcRecord()方法放入;unWatchCh中的内容由unWatchSvcRecord()放入。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 
  | func (c *controller) startWatch() { 	if c.watchCh != nil { 		return 	} 	c.watchCh = make(chan *endpoint) 	c.unWatchCh = make(chan *endpoint) 	c.nmap = make(map[string]*netWatch) 	go c.watchLoop() } func (c *controller) watchLoop() { 	for { 		select { 		case ep := <-c.watchCh: 			c.processEndpointCreate(c.nmap, ep) 		case ep := <-c.unWatchCh: 			c.processEndpointDelete(c.nmap, ep) 		} 	} } func (c *controller) watchSvcRecord(ep *endpoint) { 	c.watchCh <- ep } func (c *controller) unWatchSvcRecord(ep *endpoint) { 	c.unWatchCh <- ep } 
  | 
感觉watch操作和docker service有关,现在对service还不熟,留着以后分析。