Kube-proxy在Kubernetes中负责把service的流量导入到具体的pod上。所以kube-proxy需要从apiserver获取service及endpoint信息,而这些信息的获取就是通过config来管理的。该处的config和配置不同,应该理解为对信息来源的管理。Kube-proxy只支持从apiserver获取信息;而kubelet可以从apiserver,file和http三种渠道获取信息。本次分析将介绍kube-proxy的config,即kube-proxy是如何从apiserver中获取service及endpoint的变化,是如何把这种变化交给处理函数的。
在kube-proxy中有ServiceConfig和EndpointConfig,我们以ServiceConfig为例子。
ServiceConfig是kube-proxy从各渠道收集services信息,然后经mux merge()后,把各渠道的信息汇总到ServiceStore对应的source key下。最后通过BroadCaster先合并所有source下的services,并交给注册好的handler处理。
先来看两个概念:Mux和Broadcaster。
Mux
Mux可以创建多条channel,并与相关config的storage配合把这些channel中的数据进行合并。Mux定义在/pkg/util/config/config.go中:
1 2 3 4 5 6 7 8 9 10 11
| type Mux struct { merger Merger sourceLock sync.RWMutex sources map[string]chan interface{} }
|
其中,Merger负责把数据进行合并;sources中记录了名称及对应的channel。
可以使用NewMux()函数生成Mux:
1 2 3 4 5 6 7 8
| func NewMux(merger Merger) *Mux { mux := &Mux{ sources: make(map[string]chan interface{}), merger: merger, } return mux }
|
在创建Mux时需要指定Merger,而sources中的channel目前为空。那么,如何建立source channel,又如何把source channel中的内容交Merger处理呢?来看Mux的Channel()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| func (m *Mux) Channel(source string) chan interface{} { if len(source) == 0 { panic("Channel given an empty name") } m.sourceLock.Lock() defer m.sourceLock.Unlock() channel, exists := m.sources[source] if exists { return channel } newChannel := make(chan interface{}) m.sources[source] = newChannel go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop) return newChannel } func (m *Mux) listen(source string, listenChannel <-chan interface{}) { for update := range listenChannel { m.merger.Merge(source, update) } }
|
Channel()方法会创建一个channel,并启动一个routine调用listen(),最后把channel返回。listen()就是监听刚创建的channel,然后通过merger.Merge()处理数据。所以,通过调用Mux的Channel()方法,我们可以获取到一个channel,我们只要往这个channel中写数据即可,Mux自动会把数据交给merger处理。
再来看下什么是Merger,Merger概念定义在/pkg/util/config/config.go中:
1 2 3 4 5 6
| type Merger interface { Merge(source string, update interface{}) error }
|
可以看出,只要实现了Merge()方法的结构体都可以称为Merger,很简洁。
Broadcaster
再来看下Broadcaster,从概念的字面上即可知道这是分发用的。Broadcaster定义在/pkg/util/config.config.go中:
1 2 3 4 5 6 7 8 9 10
| type Broadcaster struct { listenerLock sync.RWMutex listeners []Listener } type Listener interface { OnUpdate(instance interface{}) }
|
Broadcaster中有多个listener。只要实现了OnUpdate()的都是Listener,kube-proxy的proxier就是一个Listener。
可以使用NewBroadcaster()生成一个Broadcaster:
1 2 3 4 5
| func NewBroadcaster() *Broadcaster { return &Broadcaster{} }
|
可以看出,新生成的Broadcaster并没有Listener,所以Broadcaster必须提供一个添加Listener的方法:
1 2 3 4 5 6
| func (b *Broadcaster) Add(listener Listener) { b.listenerLock.Lock() defer b.listenerLock.Unlock() b.listeners = append(b.listeners, listener) }
|
Broadcaster还需要提供分发的方法:
1 2 3 4 5 6 7 8 9
| func (b *Broadcaster) Notify(instance interface{}) { b.listenerLock.RLock() listeners := b.listeners b.listenerLock.RUnlock() for _, listener := range listeners { listener.OnUpdate(instance) } }
|
serviceStore
serviceStore是对多个渠道过来的数据的一个汇总的存储场所,即可以理解为是一个Merger,定义在/pkg/proxy/config/config.go中:
1 2 3 4 5
| type serviceStore struct { serviceLock sync.RWMutex services map[string]map[types.NamespacedName]api.Service updates chan<- struct{} }
|
serviceStore包含一个services字段用来存储渠道名称及该渠道客户的service;来有一个updates字段用来作消息通道用。
serviceStore提供了Merge()方法以把数据存储到自身中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| func (s *serviceStore) Merge(source string, change interface{}) error { s.serviceLock.Lock() services := s.services[source] if services == nil { services = make(map[types.NamespacedName]api.Service) } update := change.(ServiceUpdate) switch update.Op { case ADD: glog.V(5).Infof("Adding new service from source %s : %s", source, spew.Sdump(update.Services)) for _, value := range update.Services { name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} services[name] = value } case REMOVE: glog.V(5).Infof("Removing a service %s", spew.Sdump(update)) for _, value := range update.Services { name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} delete(services, name) } case SET: glog.V(5).Infof("Setting services %s", spew.Sdump(update)) services = make(map[types.NamespacedName]api.Service) for _, value := range update.Services { name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} services[name] = value } default: glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update)) } s.services[source] = services s.serviceLock.Unlock() if s.updates != nil { select { case s.updates <- struct{}{}: default: glog.V(4).Infof("Service handler already has a pending interrupt.") } } return nil }
|
Merge()根据update的type来进行不同的处理。一般来说,ADD和DELETE用的不多(重新List到的数据和内存数据不一致的时候才会),而基于ETCD事件全部走SET分支。
可以看出,Merge()把某个source个来的数据都放在services中的source key下。最后,Merge()向serviceStore的update发送信号。
刚才分析过,serviceStore中存储了多个source中的数据。serviceStore必须提供方法用来获取这些数据,这个方法就是MergedState():
1 2 3 4 5 6 7 8 9 10 11 12
| func (s *serviceStore) MergedState() interface{} { s.serviceLock.RLock() defer s.serviceLock.RUnlock() services := make([]api.Service, 0) for _, sourceServices := range s.services { for _, value := range sourceServices { services = append(services, value) } } return services }
|
MergedState()把serviceStore中所有source下的service整合到一起返回。
ServiceConfig
ServiceConfig基本上是对上述结构体的一个封装,定义在/pkg/proxy/config/config.go中:
1 2 3 4 5 6 7
| type ServiceConfig struct { mux *config.Mux bcaster *config.Broadcaster store *serviceStore }
|
ServiceConfig中有一个Mux,一个Broadcaster和一个serviceStore。Mux用来管理各source channel,这些数据源的数据会被merged到serviceStore,然后经Broadcaster把serviceStore中的数据分发到各处理体中。
来看下serviceConfig的生成函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func NewServiceConfig() *ServiceConfig { updates := make(chan struct{}, 1) store := &serviceStore{updates: updates, services: make(map[string]map[types.NamespacedName]api.Service)} mux := config.NewMux(store) bcaster := config.NewBroadcaster() go watchForUpdates(bcaster, store, updates) return &ServiceConfig{mux, bcaster, store} }
|
从mux := config.NewMux(store)
,mux使用store作为merger。NewServiceConfig()会使用routine运行watchForUpdates()。watchForUpdates()定义在/pkg/proxy/config/config.go中:
1 2 3 4 5 6 7 8
| func watchForUpdates(bcaster *config.Broadcaster, accessor config.Accessor, updates <-chan struct{}) { for true { <-updates bcaster.Notify(accessor.MergedState()) } }
|
watchForUpdates()实现很简单,监听serviceStore的updates,如果有信号过来,就通过serviceStore的MergedState()方法收集所有数据,然后调用broadCaster的Notify()方法把数据进行分发处理。
既然是对mux进行了封装,ServiceConfig需要提供新建source channel的方法:
1 2 3 4 5 6 7 8 9 10 11
| func (c *ServiceConfig) Channel(source string) chan ServiceUpdate { ch := c.mux.Channel(source) serviceCh := make(chan ServiceUpdate) go func() { for update := range serviceCh { ch <- update } close(ch) }() return serviceCh }
|
ServiceConfig的Channel()方法在mux的source channel的基础上,还加入了serviceCh,并启动routine把serviceCh中的内容放入到mux的source channel。所以,现在只要把内容放入到serviceCh中即可,而无需关心底层的mux。
同样的,ServiceConfig也需提供注册处理函数的方法:
1 2 3 4 5 6 7
| func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) { c.bcaster.Add(config.ListenerFunc(func(instance interface{}) { glog.V(3).Infof("Calling handler.OnServiceUpdate()") handler.OnServiceUpdate(instance.([]api.Service)) })) }
|
RegisterHandler()方法很简单,就是把handler.OnServiceUpdate()注册到bcaster中。
SourceAPI
之前分析了数据是如何在内部处理的,屏蔽这些细节,在外部只要把数据放入serviceCh即可。现在来看下SourceAPI,看kube-proxy是如何把内容入到serviceCh中的。
NewSourceAPI()定义在/pkg/proxy/config/api.go中:
1 2 3 4 5 6 7 8 9
| func NewSourceAPI(c cache.Getter, period time.Duration, servicesChan chan<- ServiceUpdate, endpointsChan chan<- EndpointsUpdate) { servicesLW := cache.NewListWatchFromClient(c, "services", api.NamespaceAll, fields.Everything()) cache.NewReflector(servicesLW, &api.Service{}, NewServiceStore(nil, servicesChan), period).Run() endpointsLW := cache.NewListWatchFromClient(c, "endpoints", api.NamespaceAll, fields.Everything()) cache.NewReflector(endpointsLW, &api.Endpoints{}, NewEndpointsStore(nil, endpointsChan), period).Run() }
|
NewSourceAPI()先 获取servicesLW,然后通过生成一个Relector把servicesLW中的数据放入ServiceStore中(此处的ServiceStore和之前的不一样)。
来看下NewServiceStore()的定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func NewServiceStore(store cache.Store, ch chan<- ServiceUpdate) cache.Store { fn := func(objs []interface{}) { var services []api.Service for _, o := range objs { services = append(services, *(o.(*api.Service))) } ch <- ServiceUpdate{Op: SET, Services: services} } if store == nil { store = cache.NewStore(cache.MetaNamespaceKeyFunc) } return &cache.UndeltaStore{ Store: store, PushFunc: fn, } }
|
所以,此处的ServiceStore是一个Cache.UndeltaStore (我们只需要知道某Service最新的状态即可,无需关心中间的变化),UndeltaStore的PushFunc就是把ServiceUpdate{Op: SET, Services: services}
放入到serviceCh中。
UndeltaStore
UndeltaStore定义在/pkg/client/cache/undelta_store.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| func (u *UndeltaStore) Add(obj interface{}) error { if err := u.Store.Add(obj); err != nil { return err } u.PushFunc(u.Store.List()) return nil } func (u *UndeltaStore) Update(obj interface{}) error { if err := u.Store.Update(obj); err != nil { return err } u.PushFunc(u.Store.List()) return nil } func (u *UndeltaStore) Delete(obj interface{}) error { if err := u.Store.Delete(obj); err != nil { return err } u.PushFunc(u.Store.List()) return nil } func (u *UndeltaStore) Replace(list []interface{}, resourceVersion string) error { if err := u.Store.Replace(list, resourceVersion); err != nil { return err } u.PushFunc(u.Store.List()) return nil }
|
可以看出,PushFunc()所接受的参数是Store.List(),即为全量的service。
调用
现在来看下,kube-proxy调用这些概念的。在/cmd/kube-proxy/app/server.go的NewProxyServerDefault()中:
1 2 3
| serviceConfig := proxyconfig.NewServiceConfig() serviceConfig.RegisterHandler(proxier)
|
此处代码先生成一个serviceConfig,然后调用RegisterHandler()方法注册proxier。proxier定义有OnUpdate()方法,所以可以当作BroadCaster的Listener。
还是在NewProxyServerDefault()中:
1 2 3 4 5 6
| proxyconfig.NewSourceAPI( client.Core().RESTClient(), config.ConfigSyncPeriod, serviceConfig.Channel("api"), endpointsConfig.Channel("api"), )
|
代码先用serviceConfig.Channel("api")
建立一个service channel,然后把该service channel作为参数传给NewSourceAPI()。现在SouceAPI会通过reflector及UndeltaStore把全量的service放入service channel中。serviceConfig又会把service channel中的内容放入到mux的source channel中。mux会调用serviceStore的Merge()把source channel中的内容合并到serviceStore中。serviceConfig中的BroadCaster会把serviceStore中的内容分发给之前注册的proxier处理。所以kube-proxy的config就是一个把多个源的内容合并,并交给处理体(可以多个)处理的过程。