Kube-proxy在Kubernetes中负责把service的流量导入到具体的pod上。所以kube-proxy需要从apiserver获取service及endpoint信息,而这些信息的获取就是通过config来管理的。该处的config和配置不同,应该理解为对信息来源的管理。Kube-proxy只支持从apiserver获取信息;而kubelet可以从apiserver,file和http三种渠道获取信息。本次分析将介绍kube-proxy的config,即kube-proxy是如何从apiserver中获取service及endpoint的变化,是如何把这种变化交给处理函数的。
在kube-proxy中有ServiceConfig和EndpointConfig,我们以ServiceConfig为例子。
ServiceConfig是kube-proxy从各渠道收集services信息,然后经mux merge()后,把各渠道的信息汇总到ServiceStore对应的source key下。最后通过BroadCaster先合并所有source下的services,并交给注册好的handler处理。
先来看两个概念:Mux和Broadcaster。
Mux
Mux可以创建多条channel,并与相关config的storage配合把这些channel中的数据进行合并。Mux定义在/pkg/util/config/config.go中:
1 2 3 4 5 6 7 8 9 10 11 
  | type Mux struct { 	 	merger Merger 	 	sourceLock sync.RWMutex 	 	sources map[string]chan interface{} } 
  | 
其中,Merger负责把数据进行合并;sources中记录了名称及对应的channel。
可以使用NewMux()函数生成Mux:
1 2 3 4 5 6 7 8 
  | func NewMux(merger Merger) *Mux { 	mux := &Mux{ 		sources: make(map[string]chan interface{}), 		merger:  merger, 	} 	return mux } 
  | 
在创建Mux时需要指定Merger,而sources中的channel目前为空。那么,如何建立source channel,又如何把source channel中的内容交Merger处理呢?来看Mux的Channel()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 
  | func (m *Mux) Channel(source string) chan interface{} { 	if len(source) == 0 { 		panic("Channel given an empty name") 	} 	m.sourceLock.Lock() 	defer m.sourceLock.Unlock() 	channel, exists := m.sources[source] 	if exists { 		return channel 	} 	newChannel := make(chan interface{}) 	m.sources[source] = newChannel 	go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop) 	return newChannel } func (m *Mux) listen(source string, listenChannel <-chan interface{}) { 	for update := range listenChannel { 		m.merger.Merge(source, update) 	} } 
  | 
Channel()方法会创建一个channel,并启动一个routine调用listen(),最后把channel返回。listen()就是监听刚创建的channel,然后通过merger.Merge()处理数据。所以,通过调用Mux的Channel()方法,我们可以获取到一个channel,我们只要往这个channel中写数据即可,Mux自动会把数据交给merger处理。
再来看下什么是Merger,Merger概念定义在/pkg/util/config/config.go中:
1 2 3 4 5 6 
  | type Merger interface { 	 	 	 	Merge(source string, update interface{}) error } 
  | 
可以看出,只要实现了Merge()方法的结构体都可以称为Merger,很简洁。
Broadcaster
再来看下Broadcaster,从概念的字面上即可知道这是分发用的。Broadcaster定义在/pkg/util/config.config.go中:
1 2 3 4 5 6 7 8 9 10 
  | type Broadcaster struct { 	 	listenerLock sync.RWMutex 	listeners    []Listener } type Listener interface { 	 	OnUpdate(instance interface{}) } 
  | 
Broadcaster中有多个listener。只要实现了OnUpdate()的都是Listener,kube-proxy的proxier就是一个Listener。
可以使用NewBroadcaster()生成一个Broadcaster:
1 2 3 4 5 
  | func NewBroadcaster() *Broadcaster { 	return &Broadcaster{} } 
  | 
可以看出,新生成的Broadcaster并没有Listener,所以Broadcaster必须提供一个添加Listener的方法:
1 2 3 4 5 6 
  | func (b *Broadcaster) Add(listener Listener) { 	b.listenerLock.Lock() 	defer b.listenerLock.Unlock() 	b.listeners = append(b.listeners, listener) } 
  | 
Broadcaster还需要提供分发的方法:
1 2 3 4 5 6 7 8 9 
  | func (b *Broadcaster) Notify(instance interface{}) { 	b.listenerLock.RLock() 	listeners := b.listeners 	b.listenerLock.RUnlock() 	for _, listener := range listeners { 		listener.OnUpdate(instance) 	} } 
  | 
serviceStore
serviceStore是对多个渠道过来的数据的一个汇总的存储场所,即可以理解为是一个Merger,定义在/pkg/proxy/config/config.go中:
1 2 3 4 5 
  | type serviceStore struct { 	serviceLock sync.RWMutex 	services    map[string]map[types.NamespacedName]api.Service 	updates     chan<- struct{} } 
  | 
serviceStore包含一个services字段用来存储渠道名称及该渠道客户的service;来有一个updates字段用来作消息通道用。
serviceStore提供了Merge()方法以把数据存储到自身中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 
  | func (s *serviceStore) Merge(source string, change interface{}) error { 	s.serviceLock.Lock() 	services := s.services[source] 	if services == nil { 		services = make(map[types.NamespacedName]api.Service) 	} 	update := change.(ServiceUpdate) 	switch update.Op { 	case ADD: 		glog.V(5).Infof("Adding new service from source %s : %s", source, spew.Sdump(update.Services)) 		for _, value := range update.Services { 			name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} 			services[name] = value 		} 	case REMOVE: 		glog.V(5).Infof("Removing a service %s", spew.Sdump(update)) 		for _, value := range update.Services { 			name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} 			delete(services, name) 		} 	case SET: 		glog.V(5).Infof("Setting services %s", spew.Sdump(update)) 		 		services = make(map[types.NamespacedName]api.Service) 		for _, value := range update.Services { 			name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} 			services[name] = value 		} 	default: 		glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update)) 	} 	s.services[source] = services 	s.serviceLock.Unlock() 	if s.updates != nil { 		 		 		select { 		case s.updates <- struct{}{}: 		default: 			glog.V(4).Infof("Service handler already has a pending interrupt.") 		} 	} 	return nil } 
  | 
Merge()根据update的type来进行不同的处理。一般来说,ADD和DELETE用的不多(重新List到的数据和内存数据不一致的时候才会),而基于ETCD事件全部走SET分支。
可以看出,Merge()把某个source个来的数据都放在services中的source key下。最后,Merge()向serviceStore的update发送信号。
刚才分析过,serviceStore中存储了多个source中的数据。serviceStore必须提供方法用来获取这些数据,这个方法就是MergedState():
1 2 3 4 5 6 7 8 9 10 11 12 
  | func (s *serviceStore) MergedState() interface{} { 	s.serviceLock.RLock() 	defer s.serviceLock.RUnlock() 	services := make([]api.Service, 0) 	for _, sourceServices := range s.services { 		for _, value := range sourceServices { 			services = append(services, value) 		} 	} 	return services } 
  | 
MergedState()把serviceStore中所有source下的service整合到一起返回。
ServiceConfig
ServiceConfig基本上是对上述结构体的一个封装,定义在/pkg/proxy/config/config.go中:
1 2 3 4 5 6 7 
  | type ServiceConfig struct { 	mux     *config.Mux 	bcaster *config.Broadcaster 	store   *serviceStore } 
  | 
ServiceConfig中有一个Mux,一个Broadcaster和一个serviceStore。Mux用来管理各source channel,这些数据源的数据会被merged到serviceStore,然后经Broadcaster把serviceStore中的数据分发到各处理体中。
来看下serviceConfig的生成函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 
  | func NewServiceConfig() *ServiceConfig { 	 	 	 	 	updates := make(chan struct{}, 1) 	store := &serviceStore{updates: updates, services: make(map[string]map[types.NamespacedName]api.Service)} 	mux := config.NewMux(store) 	bcaster := config.NewBroadcaster() 	 	 	go watchForUpdates(bcaster, store, updates) 	return &ServiceConfig{mux, bcaster, store} } 
  | 
从mux := config.NewMux(store),mux使用store作为merger。NewServiceConfig()会使用routine运行watchForUpdates()。watchForUpdates()定义在/pkg/proxy/config/config.go中:
1 2 3 4 5 6 7 8 
  | func watchForUpdates(bcaster *config.Broadcaster, accessor config.Accessor, updates <-chan struct{}) { 	for true { 		<-updates 		bcaster.Notify(accessor.MergedState()) 	} } 
  | 
watchForUpdates()实现很简单,监听serviceStore的updates,如果有信号过来,就通过serviceStore的MergedState()方法收集所有数据,然后调用broadCaster的Notify()方法把数据进行分发处理。
既然是对mux进行了封装,ServiceConfig需要提供新建source channel的方法:
1 2 3 4 5 6 7 8 9 10 11 
  | func (c *ServiceConfig) Channel(source string) chan ServiceUpdate { 	ch := c.mux.Channel(source) 	serviceCh := make(chan ServiceUpdate) 	go func() { 		for update := range serviceCh { 			ch <- update 		} 		close(ch) 	}() 	return serviceCh } 
  | 
ServiceConfig的Channel()方法在mux的source channel的基础上,还加入了serviceCh,并启动routine把serviceCh中的内容放入到mux的source channel。所以,现在只要把内容放入到serviceCh中即可,而无需关心底层的mux。
同样的,ServiceConfig也需提供注册处理函数的方法:
1 2 3 4 5 6 7 
  | func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) { 	 	c.bcaster.Add(config.ListenerFunc(func(instance interface{}) { 		glog.V(3).Infof("Calling handler.OnServiceUpdate()") 		handler.OnServiceUpdate(instance.([]api.Service)) 	})) } 
  | 
RegisterHandler()方法很简单,就是把handler.OnServiceUpdate()注册到bcaster中。
SourceAPI
之前分析了数据是如何在内部处理的,屏蔽这些细节,在外部只要把数据放入serviceCh即可。现在来看下SourceAPI,看kube-proxy是如何把内容入到serviceCh中的。
NewSourceAPI()定义在/pkg/proxy/config/api.go中:
1 2 3 4 5 6 7 8 9 
  | func NewSourceAPI(c cache.Getter, period time.Duration, servicesChan chan<- ServiceUpdate, endpointsChan chan<- EndpointsUpdate) { 	servicesLW := cache.NewListWatchFromClient(c, "services", api.NamespaceAll, fields.Everything()) 	cache.NewReflector(servicesLW, &api.Service{}, NewServiceStore(nil, servicesChan), period).Run() 	endpointsLW := cache.NewListWatchFromClient(c, "endpoints", api.NamespaceAll, fields.Everything()) 	cache.NewReflector(endpointsLW, &api.Endpoints{}, NewEndpointsStore(nil, endpointsChan), period).Run() } 
  | 
NewSourceAPI()先 获取servicesLW,然后通过生成一个Relector把servicesLW中的数据放入ServiceStore中(此处的ServiceStore和之前的不一样)。
来看下NewServiceStore()的定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 
  | func NewServiceStore(store cache.Store, ch chan<- ServiceUpdate) cache.Store { 	fn := func(objs []interface{}) { 		var services []api.Service 		for _, o := range objs { 			services = append(services, *(o.(*api.Service))) 		} 		ch <- ServiceUpdate{Op: SET, Services: services} 	} 	if store == nil { 		store = cache.NewStore(cache.MetaNamespaceKeyFunc) 	} 	return &cache.UndeltaStore{ 		Store:    store, 		PushFunc: fn, 	} } 
  | 
所以,此处的ServiceStore是一个Cache.UndeltaStore (我们只需要知道某Service最新的状态即可,无需关心中间的变化),UndeltaStore的PushFunc就是把ServiceUpdate{Op: SET, Services: services}放入到serviceCh中。
UndeltaStore
UndeltaStore定义在/pkg/client/cache/undelta_store.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 
  | func (u *UndeltaStore) Add(obj interface{}) error { 	if err := u.Store.Add(obj); err != nil { 		return err 	} 	u.PushFunc(u.Store.List()) 	return nil } func (u *UndeltaStore) Update(obj interface{}) error { 	if err := u.Store.Update(obj); err != nil { 		return err 	} 	u.PushFunc(u.Store.List()) 	return nil } func (u *UndeltaStore) Delete(obj interface{}) error { 	if err := u.Store.Delete(obj); err != nil { 		return err 	} 	u.PushFunc(u.Store.List()) 	return nil } func (u *UndeltaStore) Replace(list []interface{}, resourceVersion string) error { 	if err := u.Store.Replace(list, resourceVersion); err != nil { 		return err 	} 	u.PushFunc(u.Store.List()) 	return nil } 
  | 
可以看出,PushFunc()所接受的参数是Store.List(),即为全量的service。
调用
现在来看下,kube-proxy调用这些概念的。在/cmd/kube-proxy/app/server.go的NewProxyServerDefault()中:
1 2 3 
  | serviceConfig := proxyconfig.NewServiceConfig() serviceConfig.RegisterHandler(proxier) 
  | 
此处代码先生成一个serviceConfig,然后调用RegisterHandler()方法注册proxier。proxier定义有OnUpdate()方法,所以可以当作BroadCaster的Listener。
还是在NewProxyServerDefault()中:
1 2 3 4 5 6 
  | proxyconfig.NewSourceAPI( 	client.Core().RESTClient(), 	config.ConfigSyncPeriod, 	serviceConfig.Channel("api"), 	endpointsConfig.Channel("api"), ) 
  | 
代码先用serviceConfig.Channel("api")建立一个service channel,然后把该service channel作为参数传给NewSourceAPI()。现在SouceAPI会通过reflector及UndeltaStore把全量的service放入service channel中。serviceConfig又会把service channel中的内容放入到mux的source channel中。mux会调用serviceStore的Merge()把source channel中的内容合并到serviceStore中。serviceConfig中的BroadCaster会把serviceStore中的内容分发给之前注册的proxier处理。所以kube-proxy的config就是一个把多个源的内容合并,并交给处理体(可以多个)处理的过程。