Kube-proxy在Kubernetes中负责把service的流量导入到具体的pod上。所以kube-proxy需要从apiserver获取service及endpoint信息,而这些信息的获取就是通过config来管理的。该处的config和配置不同,应该理解为对信息来源的管理。Kube-proxy只支持从apiserver获取信息;而kubelet可以从apiserver,file和http三种渠道获取信息。本次分析将介绍kube-proxy的config,即kube-proxy是如何从apiserver中获取service及endpoint的变化,是如何把这种变化交给处理函数的。
在kube-proxy中有ServiceConfig和EndpointConfig,我们以ServiceConfig为例子。
ServiceConfig是kube-proxy从各渠道收集services信息,然后经mux merge()后,把各渠道的信息汇总到ServiceStore对应的source key下。最后通过BroadCaster先合并所有source下的services,并交给注册好的handler处理。
先来看两个概念:Mux和Broadcaster。

Mux

Mux可以创建多条channel,并与相关config的storage配合把这些channel中的数据进行合并。Mux定义在/pkg/util/config/config.go中:

1
2
3
4
5
6
7
8
9
10
11
// Mux is a class for merging configuration from multiple sources. Changes are
// pushed via channels and sent to the merge function.
type Mux struct {
// Invoked when an update is sent to a source.
merger Merger
// Sources and their lock.
sourceLock sync.RWMutex
// Maps source names to channels
sources map[string]chan interface{}
}

其中,Merger负责把数据进行合并;sources中记录了名称及对应的channel。
可以使用NewMux()函数生成Mux:

1
2
3
4
5
6
7
8
// NewMux creates a new mux that can merge changes from multiple sources.
func NewMux(merger Merger) *Mux {
mux := &Mux{
sources: make(map[string]chan interface{}),
merger: merger,
}
return mux
}

在创建Mux时需要指定Merger,而sources中的channel目前为空。那么,如何建立source channel,又如何把source channel中的内容交Merger处理呢?来看Mux的Channel()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (m *Mux) Channel(source string) chan interface{} {
if len(source) == 0 {
panic("Channel given an empty name")
}
m.sourceLock.Lock()
defer m.sourceLock.Unlock()
channel, exists := m.sources[source]
if exists {
return channel
}
newChannel := make(chan interface{})
m.sources[source] = newChannel
go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
return newChannel
}
func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
for update := range listenChannel {
m.merger.Merge(source, update)
}
}

Channel()方法会创建一个channel,并启动一个routine调用listen(),最后把channel返回。listen()就是监听刚创建的channel,然后通过merger.Merge()处理数据。所以,通过调用Mux的Channel()方法,我们可以获取到一个channel,我们只要往这个channel中写数据即可,Mux自动会把数据交给merger处理。

再来看下什么是Merger,Merger概念定义在/pkg/util/config/config.go中:

1
2
3
4
5
6
type Merger interface {
// Invoked when a change from a source is received. May also function as an incremental
// merger if you wish to consume changes incrementally. Must be reentrant when more than
// one source is defined.
Merge(source string, update interface{}) error
}

可以看出,只要实现了Merge()方法的结构体都可以称为Merger,很简洁。

Broadcaster

再来看下Broadcaster,从概念的字面上即可知道这是分发用的。Broadcaster定义在/pkg/util/config.config.go中:

1
2
3
4
5
6
7
8
9
10
type Broadcaster struct {
// Listeners for changes and their lock.
listenerLock sync.RWMutex
listeners []Listener
}
type Listener interface {
// OnUpdate is invoked when a change is made to an object.
OnUpdate(instance interface{})
}

Broadcaster中有多个listener。只要实现了OnUpdate()的都是Listener,kube-proxy的proxier就是一个Listener。
可以使用NewBroadcaster()生成一个Broadcaster:

1
2
3
4
5
// NewBroadcaster registers a set of listeners that support the Listener interface
// and notifies them all on changes.
func NewBroadcaster() *Broadcaster {
return &Broadcaster{}
}

可以看出,新生成的Broadcaster并没有Listener,所以Broadcaster必须提供一个添加Listener的方法:

1
2
3
4
5
6
// Add registers listener to receive updates of changes.
func (b *Broadcaster) Add(listener Listener) {
b.listenerLock.Lock()
defer b.listenerLock.Unlock()
b.listeners = append(b.listeners, listener)
}

Broadcaster还需要提供分发的方法:

1
2
3
4
5
6
7
8
9
// Notify notifies all listeners.
func (b *Broadcaster) Notify(instance interface{}) {
b.listenerLock.RLock()
listeners := b.listeners
b.listenerLock.RUnlock()
for _, listener := range listeners {
listener.OnUpdate(instance)
}
}

serviceStore

serviceStore是对多个渠道过来的数据的一个汇总的存储场所,即可以理解为是一个Merger,定义在/pkg/proxy/config/config.go中:

1
2
3
4
5
type serviceStore struct {
serviceLock sync.RWMutex
services map[string]map[types.NamespacedName]api.Service
updates chan<- struct{}
}

serviceStore包含一个services字段用来存储渠道名称及该渠道客户的service;来有一个updates字段用来作消息通道用。
serviceStore提供了Merge()方法以把数据存储到自身中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//***Fankang***//
//***把数据放到serviceStore中***//
func (s *serviceStore) Merge(source string, change interface{}) error {
s.serviceLock.Lock()
services := s.services[source]
if services == nil {
services = make(map[types.NamespacedName]api.Service)
}
update := change.(ServiceUpdate)
switch update.Op {
case ADD:
glog.V(5).Infof("Adding new service from source %s : %s", source, spew.Sdump(update.Services))
for _, value := range update.Services {
name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}
services[name] = value
}
case REMOVE:
glog.V(5).Infof("Removing a service %s", spew.Sdump(update))
for _, value := range update.Services {
name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}
delete(services, name)
}
case SET:
glog.V(5).Infof("Setting services %s", spew.Sdump(update))
// Clear the old map entries by just creating a new map
services = make(map[types.NamespacedName]api.Service)
for _, value := range update.Services {
name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}
services[name] = value
}
default:
glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update))
}
s.services[source] = services
s.serviceLock.Unlock()
if s.updates != nil {
// Since we record the snapshot before sending this signal, it's
// possible that the consumer ends up performing an extra update.
select {
case s.updates <- struct{}{}:
default:
glog.V(4).Infof("Service handler already has a pending interrupt.")
}
}
return nil
}

Merge()根据update的type来进行不同的处理。一般来说,ADD和DELETE用的不多(重新List到的数据和内存数据不一致的时候才会),而基于ETCD事件全部走SET分支。
可以看出,Merge()把某个source个来的数据都放在services中的source key下。最后,Merge()向serviceStore的update发送信号。

刚才分析过,serviceStore中存储了多个source中的数据。serviceStore必须提供方法用来获取这些数据,这个方法就是MergedState():

1
2
3
4
5
6
7
8
9
10
11
12
//***获取merge()过的services***//
func (s *serviceStore) MergedState() interface{} {
s.serviceLock.RLock()
defer s.serviceLock.RUnlock()
services := make([]api.Service, 0)
for _, sourceServices := range s.services {
for _, value := range sourceServices {
services = append(services, value)
}
}
return services
}

MergedState()把serviceStore中所有source下的service整合到一起返回。

ServiceConfig

ServiceConfig基本上是对上述结构体的一个封装,定义在/pkg/proxy/config/config.go中:

1
2
3
4
5
6
7
// ServiceConfig tracks a set of service configurations.
// It accepts "set", "add" and "remove" operations of services via channels, and invokes registered handlers on change.
type ServiceConfig struct {
mux *config.Mux
bcaster *config.Broadcaster
store *serviceStore
}

ServiceConfig中有一个Mux,一个Broadcaster和一个serviceStore。Mux用来管理各source channel,这些数据源的数据会被merged到serviceStore,然后经Broadcaster把serviceStore中的数据分发到各处理体中。
来看下serviceConfig的生成函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// NewServiceConfig creates a new ServiceConfig.
// It immediately runs the created ServiceConfig.
func NewServiceConfig() *ServiceConfig {
// The updates channel is used to send interrupts to the Services handler.
// It's buffered because we never want to block for as long as there is a
// pending interrupt, but don't want to drop them if the handler is doing
// work.
updates := make(chan struct{}, 1)
store := &serviceStore{updates: updates, services: make(map[string]map[types.NamespacedName]api.Service)}
mux := config.NewMux(store)
bcaster := config.NewBroadcaster()
//***Fankang***//
//***此处开启收集serviceStore中的services,并交给bcaster中注册的***//
go watchForUpdates(bcaster, store, updates)
return &ServiceConfig{mux, bcaster, store}
}

mux := config.NewMux(store),mux使用store作为merger。NewServiceConfig()会使用routine运行watchForUpdates()。watchForUpdates()定义在/pkg/proxy/config/config.go中:

1
2
3
4
5
6
7
8
// watchForUpdates invokes bcaster.Notify() with the latest version of an object
// when changes occur.
func watchForUpdates(bcaster *config.Broadcaster, accessor config.Accessor, updates <-chan struct{}) {
for true {
<-updates
bcaster.Notify(accessor.MergedState())
}
}

watchForUpdates()实现很简单,监听serviceStore的updates,如果有信号过来,就通过serviceStore的MergedState()方法收集所有数据,然后调用broadCaster的Notify()方法把数据进行分发处理。

既然是对mux进行了封装,ServiceConfig需要提供新建source channel的方法:

1
2
3
4
5
6
7
8
9
10
11
func (c *ServiceConfig) Channel(source string) chan ServiceUpdate {
ch := c.mux.Channel(source)
serviceCh := make(chan ServiceUpdate)
go func() {
for update := range serviceCh {
ch <- update
}
close(ch)
}()
return serviceCh
}

ServiceConfig的Channel()方法在mux的source channel的基础上,还加入了serviceCh,并启动routine把serviceCh中的内容放入到mux的source channel。所以,现在只要把内容放入到serviceCh中即可,而无需关心底层的mux。

同样的,ServiceConfig也需提供注册处理函数的方法:

1
2
3
4
5
6
7
func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
//***把service***//
c.bcaster.Add(config.ListenerFunc(func(instance interface{}) {
glog.V(3).Infof("Calling handler.OnServiceUpdate()")
handler.OnServiceUpdate(instance.([]api.Service))
}))
}

RegisterHandler()方法很简单,就是把handler.OnServiceUpdate()注册到bcaster中。

SourceAPI

之前分析了数据是如何在内部处理的,屏蔽这些细节,在外部只要把数据放入serviceCh即可。现在来看下SourceAPI,看kube-proxy是如何把内容入到serviceCh中的。
NewSourceAPI()定义在/pkg/proxy/config/api.go中:

1
2
3
4
5
6
7
8
9
//***把service的所有信息放入到servicesChan中***//
//***把endpoint的所有信息放入到endpointsChan中***//
func NewSourceAPI(c cache.Getter, period time.Duration, servicesChan chan<- ServiceUpdate, endpointsChan chan<- EndpointsUpdate) {
servicesLW := cache.NewListWatchFromClient(c, "services", api.NamespaceAll, fields.Everything())
cache.NewReflector(servicesLW, &api.Service{}, NewServiceStore(nil, servicesChan), period).Run()
endpointsLW := cache.NewListWatchFromClient(c, "endpoints", api.NamespaceAll, fields.Everything())
cache.NewReflector(endpointsLW, &api.Endpoints{}, NewEndpointsStore(nil, endpointsChan), period).Run()
}

NewSourceAPI()先 获取servicesLW,然后通过生成一个Relector把servicesLW中的数据放入ServiceStore中(此处的ServiceStore和之前的不一样)。
来看下NewServiceStore()的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func NewServiceStore(store cache.Store, ch chan<- ServiceUpdate) cache.Store {
fn := func(objs []interface{}) {
var services []api.Service
for _, o := range objs {
services = append(services, *(o.(*api.Service)))
}
ch <- ServiceUpdate{Op: SET, Services: services}
}
if store == nil {
store = cache.NewStore(cache.MetaNamespaceKeyFunc)
}
return &cache.UndeltaStore{
Store: store,
PushFunc: fn,
}
}

所以,此处的ServiceStore是一个Cache.UndeltaStore (我们只需要知道某Service最新的状态即可,无需关心中间的变化),UndeltaStore的PushFunc就是把ServiceUpdate{Op: SET, Services: services}放入到serviceCh中。

UndeltaStore

UndeltaStore定义在/pkg/client/cache/undelta_store.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (u *UndeltaStore) Add(obj interface{}) error {
if err := u.Store.Add(obj); err != nil {
return err
}
u.PushFunc(u.Store.List())
return nil
}
func (u *UndeltaStore) Update(obj interface{}) error {
if err := u.Store.Update(obj); err != nil {
return err
}
u.PushFunc(u.Store.List())
return nil
}
func (u *UndeltaStore) Delete(obj interface{}) error {
if err := u.Store.Delete(obj); err != nil {
return err
}
u.PushFunc(u.Store.List())
return nil
}
func (u *UndeltaStore) Replace(list []interface{}, resourceVersion string) error {
if err := u.Store.Replace(list, resourceVersion); err != nil {
return err
}
u.PushFunc(u.Store.List())
return nil
}

可以看出,PushFunc()所接受的参数是Store.List(),即为全量的service。

调用

现在来看下,kube-proxy调用这些概念的。在/cmd/kube-proxy/app/server.go的NewProxyServerDefault()中:

1
2
3
//***生成serviceConfig***//
serviceConfig := proxyconfig.NewServiceConfig()
serviceConfig.RegisterHandler(proxier)

此处代码先生成一个serviceConfig,然后调用RegisterHandler()方法注册proxier。proxier定义有OnUpdate()方法,所以可以当作BroadCaster的Listener。
还是在NewProxyServerDefault()中:

1
2
3
4
5
6
proxyconfig.NewSourceAPI(
client.Core().RESTClient(),
config.ConfigSyncPeriod,
serviceConfig.Channel("api"),
endpointsConfig.Channel("api"),
)

代码先用serviceConfig.Channel("api")建立一个service channel,然后把该service channel作为参数传给NewSourceAPI()。现在SouceAPI会通过reflector及UndeltaStore把全量的service放入service channel中。serviceConfig又会把service channel中的内容放入到mux的source channel中。mux会调用serviceStore的Merge()把source channel中的内容合并到serviceStore中。serviceConfig中的BroadCaster会把serviceStore中的内容分发给之前注册的proxier处理。所以kube-proxy的config就是一个把多个源的内容合并,并交给处理体(可以多个)处理的过程。