本次分析只介绍userspace模式。

proxySocket

Proxysocket相关的内容定义在/pkg/proxy/userspace/proxysocket.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
// Abstraction over TCP/UDP sockets which are proxied.
type proxySocket interface {
// Addr gets the net.Addr for a proxySocket.
Addr() net.Addr
// Close stops the proxySocket from accepting incoming connections.
// Each implementation should comment on the impact of calling Close
// while sessions are active.
Close() error
// ProxyLoop proxies incoming connections for the specified service to the service endpoints.
ProxyLoop(service proxy.ServicePortName, info *serviceInfo, proxier *Proxier)
// ListenPort returns the host port that the proxySocket is listening on
ListenPort() int
}

只要定义了Addr(), Close(), ProxyLoop(), ListenPort()方法的结构体的值都可以称为proxySocket。 proxySocket的具体功能是建立物理主机上某一端口到endpoint的连接先来看下proxySocket的生成函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//***建立与物理主机:端口的连接***//
func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, error) {
host := ""
if ip != nil {
host = ip.String()
}
switch strings.ToUpper(string(protocol)) {
case "TCP":
//***使用net.Listen()监听主机某端口***//
listener, err := net.Listen("tcp", net.JoinHostPort(host, strconv.Itoa(port)))
if err != nil {
return nil, err
}
return &tcpProxySocket{Listener: listener, port: port}, nil
case "UDP":
addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(host, strconv.Itoa(port)))
if err != nil {
return nil, err
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}
return &udpProxySocket{UDPConn: conn, port: port}, nil
}
return nil, fmt.Errorf("unknown protocol %q", protocol)
}

可以看出,proxySocket有tcpProxySocket和udpProxySocket两种,我们以tcpProxySocket为例进行分析。newProxySocket()函数会调用net.Listen()函数监听host上的端口port,然后把获取到的listener封装成tcpProxySocket返回。
再来看下proxySocket如何与service的endpoint建立连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//***建立与endpoint的连接***//
func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) {
sessionAffinityReset := false
for _, dialTimeout := range endpointDialTimeout {
//***获取合适的endpoint***//
endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr, sessionAffinityReset)
if err != nil {
glog.Errorf("Couldn't find an endpoint for %s: %v", service, err)
return nil, err
}
glog.V(3).Infof("Mapped service %q to endpoint %s", service, endpoint)
// TODO: This could spin up a new goroutine to make the outbound connection,
// and keep accepting inbound traffic.
//***使用net.DialTimeout()函数建立与endpoint的连接***//
outConn, err := net.DialTimeout(protocol, endpoint, dialTimeout)
if err != nil {
if isTooManyFDsError(err) {
panic("Dial failed: " + err.Error())
}
glog.Errorf("Dial failed: %v", err)
sessionAffinityReset = true
continue
}
return outConn, nil
}
return nil, fmt.Errorf("failed to connect to an endpoint.")
}

tryConnect()先调用locabalancer的NextEndpoint()函数获取一个合适的endpoint, 然后调用net.DialTimeout()函数建立与endpoint的连接。

tcpProxySocket

tcpProxySocket定义在/pkg/proxy/userspace/proxysocket.go中:

1
2
3
4
5
6
// tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called,
// no new connections are allowed but existing connections are left untouched.
type tcpProxySocket struct {
net.Listener
port int
}

tcpProxySocket内嵌了net.Listener,还有字段port。tcpProxySocket最重要的方法就是ProxyLoop():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//***ProxyLoop()就是net包监听时的for循环,请参考附录***//
func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) {
for {
if !myInfo.isAlive() {
// The service port was closed or replaced.
return
}
// Block until a connection is made.
//**接收数据***//
inConn, err := tcp.Accept()
if err != nil {
if isTooManyFDsError(err) {
panic("Accept failed: " + err.Error())
}
if isClosedError(err) {
return
}
if !myInfo.isAlive() {
// Then the service port was just closed so the accept failure is to be expected.
return
}
glog.Errorf("Accept failed: %v", err)
continue
}
glog.V(3).Infof("Accepted TCP connection from %v to %v", inConn.RemoteAddr(), inConn.LocalAddr())
//***连接endpoint***//
outConn, err := tryConnect(service, inConn.(*net.TCPConn).RemoteAddr(), "tcp", proxier)
if err != nil {
glog.Errorf("Failed to connect to balancer: %v", err)
inConn.Close()
continue
}
// Spin up an async copy loop.
//***处理TCP数据***//
//***在inConn和outConn之间相互拷贝数据***//
go proxyTCP(inConn.(*net.TCPConn), outConn.(*net.TCPConn))
}
}

其中for循环是net.List()机制的固定用法。ProxyLoop()先使用tcp.Accept()获取端口的连接,然后再通过tryConnect()建立与endpoint的连接,然后在把这两个连接上数据进行相互拷贝,即调用proxyTCP()函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// proxyTCP proxies data bi-directionally between in and out.
func proxyTCP(in, out *net.TCPConn) {
var wg sync.WaitGroup
wg.Add(2)
glog.V(4).Infof("Creating proxy between %v <-> %v <-> %v <-> %v",
in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr())
//***在in和out之间拷贝数据***//
go copyBytes("from backend", in, out, &wg)
go copyBytes("to backend", out, in, &wg)
wg.Wait()
}
func copyBytes(direction string, dest, src *net.TCPConn, wg *sync.WaitGroup) {
defer wg.Done()
glog.V(4).Infof("Copying %s: %s -> %s", direction, src.RemoteAddr(), dest.RemoteAddr())
n, err := io.Copy(dest, src)
if err != nil {
if !isClosedError(err) {
glog.Errorf("I/O error: %v", err)
}
}
glog.V(4).Infof("Copied %d bytes %s: %s -> %s", n, direction, src.RemoteAddr(), dest.RemoteAddr())
dest.Close()
src.Close()
}

LoadBalancerRR

LoadBalancerRR的相关概念定义在/pkg/proxy/userspace/roundrobin.go中:

1
2
3
4
5
//***ServicePortName中有Namespace+ServiceName和Port***//
type LoadBalancerRR struct {
lock sync.RWMutex
services map[proxy.ServicePortName]*balancerState
}

LoadBalancerRR定义了ServicePortName和balancerState的map关系。其中ServicePortName类似default/mysql:mysql形式;balancerState定义如下:

1
2
3
4
5
type balancerState struct {
endpoints []string // a list of "ip:port" style strings
index int // current index into endpoints
affinity affinityPolicy
}

balancerState中存有多个endpoint,index指向目前使用的endpoint的下一个endpoint。affinityPolicy用来标识session会话,对于同一个源IP的包,转发到同一个endpoint上。affinityPolicy定义如下:

1
2
3
4
5
type affinityPolicy struct {
affinityType api.ServiceAffinity
affinityMap map[string]*affinityState // map client IP -> affinity info
ttlMinutes int
}

其中affinityMap记录了源IP和affinityState的map关系。affinityState定义如下:

1
2
3
4
5
6
7
type affinityState struct {
clientIP string
//clientProtocol api.Protocol //not yet used
//sessionCookie string //not yet used
endpoint string
lastUsed time.Time
}

affinityState最重要的是记录了endpoint。

我们知道LoadBalancerRR是用来管理endpoint的,所以需要提供注册及删除endpoint的方法。LoadBalancerRR提供了NewService()及DeleteService()来管理增删操作。
NewService()定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//***svcPort: default/mysql:mysql***//
func (lb *LoadBalancerRR) NewService(svcPort proxy.ServicePortName, affinityType api.ServiceAffinity, ttlMinutes int) error {
glog.V(4).Infof("LoadBalancerRR NewService %q", svcPort)
lb.lock.Lock()
defer lb.lock.Unlock()
lb.newServiceInternal(svcPort, affinityType, ttlMinutes)
return nil
}
// This assumes that lb.lock is already held.
//***向LoadBalancerRR中注册svcPort***//
func (lb *LoadBalancerRR) newServiceInternal(svcPort proxy.ServicePortName, affinityType api.ServiceAffinity, ttlMinutes int) *balancerState {
if ttlMinutes == 0 {
ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimited instead????
}
if _, exists := lb.services[svcPort]; !exists {
lb.services[svcPort] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlMinutes)}
glog.V(4).Infof("LoadBalancerRR service %q did not exist, created", svcPort)
} else if affinityType != "" {
lb.services[svcPort].affinity.affinityType = affinityType
}
return lb.services[svcPort]
}

基本的流程就是生成上述相关概念,然后存储到相应的位置。
DeleteService()定义如下:

1
2
3
4
5
6
7
//***在LoadBalancerRR中删除svcPort***//
func (lb *LoadBalancerRR) DeleteService(svcPort proxy.ServicePortName) {
glog.V(4).Infof("LoadBalancerRR DeleteService %q", svcPort)
lb.lock.Lock()
defer lb.lock.Unlock()
delete(lb.services, svcPort)
}

删除流程比较简单,直接把svcPort从LoadBalancerRR services map中删除即可。

接下来看下LoadBalancerRR的一个非常重要的方法:NextEndpoint()。NextEndpoint()是用来返回一个可以使用的endpoint的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
//***获取合适的endpoint***//
func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool) (string, error) {
// Coarse locking is simple. We can get more fine-grained if/when we
// can prove it matters.
lb.lock.Lock()
defer lb.lock.Unlock()
state, exists := lb.services[svcPort]
if !exists || state == nil {
return "", ErrMissingServiceEntry
}
if len(state.endpoints) == 0 {
return "", ErrMissingEndpoints
}
glog.V(4).Infof("NextEndpoint for service %q, srcAddr=%v: endpoints: %+v", svcPort, srcAddr, state.endpoints)
sessionAffinityEnabled := isSessionAffinity(&state.affinity)
var ipaddr string
//***若有session,则通过affinityMap中查找以返回同一个endpoint***//
if sessionAffinityEnabled {
// Caution: don't shadow ipaddr
var err error
ipaddr, _, err = net.SplitHostPort(srcAddr.String())
if err != nil {
return "", fmt.Errorf("malformed source address %q: %v", srcAddr.String(), err)
}
if !sessionAffinityReset {
sessionAffinity, exists := state.affinity.affinityMap[ipaddr]
if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Minutes()) < state.affinity.ttlMinutes {
// Affinity wins.
endpoint := sessionAffinity.endpoint
sessionAffinity.lastUsed = time.Now()
glog.V(4).Infof("NextEndpoint for service %q from IP %s with sessionAffinity %#v: %s", svcPort, ipaddr, sessionAffinity, endpoint)
return endpoint, nil
}
}
}
// Take the next endpoint.
//***获取endpoint***//
endpoint := state.endpoints[state.index]
state.index = (state.index + 1) % len(state.endpoints)
//***更新affinityMap***//
if sessionAffinityEnabled {
var affinity *affinityState
affinity = state.affinity.affinityMap[ipaddr]
if affinity == nil {
affinity = new(affinityState) //&affinityState{ipaddr, "TCP", "", endpoint, time.Now()}
state.affinity.affinityMap[ipaddr] = affinity
}
affinity.lastUsed = time.Now()
affinity.endpoint = endpoint
affinity.clientIP = ipaddr
glog.V(4).Infof("Updated affinity key %s: %#v", ipaddr, state.affinity.affinityMap[ipaddr])
}
return endpoint, nil
}

NextEndpoint()会从LoadBalancerRR的services中获取对应svcPort key下的balancerState,然后检查balancerState的affinity,如果源IP在这边有记录,则直接返回记录中的endpoint;否则返回balancerState中index指向的endpoint,并把index加1以指向下一个endpoint;然后更新affinity中的会话信息。

那么,LoadBalancerRR是如何获取endpoint的信息的呢。我们之前分析ServiceConfig时,提到数据最后交由proxier处理,因为proxier定义有OnServiceUpdate()方法;同样的,LoadBalancerRR定义了OnEndpointsUpdate()方法,可以当作EndpointsConfig的一个handler。在/cmd/kube-proxy/app/server.go的NewProxyServerDefault()函数中,有:

1
2
3
4
5
6
7
8
9
10
......
//***新建loadBalancer***//
loadBalancer := userspace.NewLoadBalancerRR()
// set EndpointsConfigHandler to our loadBalancer
endpointsHandler = loadBalancer
......
//***生成endpointsConfig***//
endpointsConfig := proxyconfig.NewEndpointsConfig()
endpointsConfig.RegisterHandler(endpointsHandler)
......

还是来看下OnEndpointsUpdate()吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
func (lb *LoadBalancerRR) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
registeredEndpoints := make(map[proxy.ServicePortName]bool)
lb.lock.Lock()
defer lb.lock.Unlock()
// Update endpoints for services.
for i := range allEndpoints {
//***Fankang***//
//***svcEndpoints: {{ } {ubuntu-ssh default /api/v1/namespaces/default/endpoints/ubuntu-ssh 457dbe9b-d991-11e6-8269-08002771e393 241185 0 2017-01-13 21:07:44 +0800 CST <nil> <nil> map[] map[]} [{[{172.17.0.2 0xc82030ad20}] [] [{ssh 22 TCP}]}]}***//
svcEndpoints := &allEndpoints[i]
// We need to build a map of portname -> all ip:ports for that
// portname. Explode Endpoints.Subsets[*] into this structure.
portsToEndpoints := map[string][]hostPortPair{}
//***Fankang***//
//***svcEndpoints.Subsets[i]: {[{172.17.0.2 0xc82032d420}] [] [{ssh 22 TCP}]}***//
for i := range svcEndpoints.Subsets {
ss := &svcEndpoints.Subsets[i]
for i := range ss.Ports {
port := &ss.Ports[i]
for i := range ss.Addresses {
addr := &ss.Addresses[i]
portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortPair{addr.IP, int(port.Port)})
// Ignore the protocol field - we'll get that from the Service objects.
}
}
}
for portname := range portsToEndpoints {
//***构造svcPort***//
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: svcEndpoints.Namespace, Name: svcEndpoints.Name}, Port: portname}
state, exists := lb.services[svcPort]
curEndpoints := []string{}
if state != nil {
curEndpoints = state.endpoints
}
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) {
glog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints)
lb.updateAffinityMap(svcPort, newEndpoints)
// OnEndpointsUpdate can be called without NewService being called externally.
// To be safe we will call it here. A new service will only be created
// if one does not already exist. The affinity will be updated
// later, once NewService is called.
//***注册svcPort,并向该svcPort下的state中存入endpoints***//
state = lb.newServiceInternal(svcPort, api.ServiceAffinity(""), 0)
state.endpoints = slice.ShuffleStrings(newEndpoints)
// Reset the round-robin index.
state.index = 0
}
registeredEndpoints[svcPort] = true
}
}
// Remove endpoints missing from the update.
//***删除services中多余的内容***//
for k := range lb.services {
if _, exists := registeredEndpoints[k]; !exists {
glog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", k)
// Reset but don't delete.
state := lb.services[k]
state.endpoints = []string{}
state.index = 0
state.affinity.affinityMap = map[string]*affinityState{}
}
}
}

OnEndpointUpdate()函数就是把config获取到的endpoints放入到LoadBalancerRR中。

Proxier

Proxier定义在/pkg/proxy/userspace/proxier.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Proxier is a simple proxy for TCP connections between a localhost:lport
// and services that provide the actual implementations.
type Proxier struct {
loadBalancer LoadBalancer
mu sync.Mutex // protects serviceMap
serviceMap map[proxy.ServicePortName]*serviceInfo
syncPeriod time.Duration
minSyncPeriod time.Duration // unused atm, but plumbed through
udpIdleTimeout time.Duration
portMapMutex sync.Mutex
portMap map[portMapKey]*portMapValue
numProxyLoops int32 // use atomic ops to access this; mostly for testing
listenIP net.IP
iptables iptables.Interface
hostIP net.IP
proxyPorts PortAllocator
}

Proxier中最重要的字段是serviceMap,存储有ServicePortName和serviceInfo的map关系。serviceInfo定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type serviceInfo struct {
isAliveAtomic int32 // Only access this with atomic ops
portal portal
protocol api.Protocol
proxyPort int
socket proxySocket
timeout time.Duration
activeClients *clientCache
nodePort int
loadBalancerStatus api.LoadBalancerStatus
sessionAffinityType api.ServiceAffinity
stickyMaxAgeMinutes int
// Deprecated, but required for back-compat (including e2e)
externalIPs []string
}

整个Proxier可以分为两个部分:OnServiceUpdate()和SyncLoop()。

OnServiceUpdate()

先来看OnServiceUpdate(),定义在/pkg/proxy/userspace/proxier.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
//***OnServiceUpdate()是config在收集完services时需调用的函数***//
func (proxier *Proxier) OnServiceUpdate(services []api.Service) {
glog.V(4).Infof("Received update notice: %+v", services)
activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set
for i := range services {
service := &services[i]
// if ClusterIP is "None" or empty, skip proxying
if !api.IsServiceIPSet(service) {
glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP)
continue
}
for i := range service.Spec.Ports {
//***servicePort: &{ TCP 443 {0 443 } 0}***//
servicePort := &service.Spec.Ports[i]
serviceName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name}
activeServices[serviceName] = true
serviceIP := net.ParseIP(service.Spec.ClusterIP)
info, exists := proxier.getServiceInfo(serviceName)
// TODO: check health of the socket? What if ProxyLoop exited?
//***如果service在proxier中已经存在,且没有变化***//
if exists && sameConfig(info, service, servicePort) {
// Nothing changed.
continue
}
//***如果service在proxier中已经存在,但现在有变化,则先把之前的删掉***//
if exists {
glog.V(4).Infof("Something changed for service %q: stopping it", serviceName)
err := proxier.closePortal(serviceName, info)
if err != nil {
glog.Errorf("Failed to close portal for %q: %v", serviceName, err)
}
err = proxier.stopProxy(serviceName, info)
if err != nil {
glog.Errorf("Failed to stop service %q: %v", serviceName, err)
}
}
proxyPort, err := proxier.proxyPorts.AllocateNext()
if err != nil {
glog.Errorf("failed to allocate proxy port for service %q: %v", serviceName, err)
continue
}
//***Adding new service "default/mysql:mysql" at 10.100.112.138:3306/TCP***//
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol)
//***新增***//
info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout)
if err != nil {
glog.Errorf("Failed to start proxy for %q: %v", serviceName, err)
continue
}
info.portal.ip = serviceIP
info.portal.port = int(servicePort.Port)
info.externalIPs = service.Spec.ExternalIPs
// Deep-copy in case the service instance changes
info.loadBalancerStatus = *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
info.nodePort = int(servicePort.NodePort)
info.sessionAffinityType = service.Spec.SessionAffinity
glog.V(4).Infof("info: %#v", info)
err = proxier.openPortal(serviceName, info)
if err != nil {
glog.Errorf("Failed to open portal for %q: %v", serviceName, err)
}
proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeMinutes)
}
}
proxier.mu.Lock()
defer proxier.mu.Unlock()
for name, info := range proxier.serviceMap {
if !activeServices[name] {
glog.V(1).Infof("Stopping service %q", name)
err := proxier.closePortal(name, info)
if err != nil {
glog.Errorf("Failed to close portal for %q: %v", name, err)
}
err = proxier.stopProxyInternal(name, info)
if err != nil {
glog.Errorf("Failed to stop service %q: %v", name, err)
}
proxier.loadBalancer.DeleteService(name)
}
}
}

OnServiceUpdate()是proxier.go中处理services的入口,把config获取到的services加入到proxier中,当然也会更新proxier中的loadbalancer。其中针对service中的每个port,根据不同的情况进行不同的处理。最关键的是关于新增部分的处理,调用了addServiceOnPort()函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// addServiceOnPort starts listening for a new service, returning the serviceInfo.
// Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP
// connections, for now.
func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) {
//***新建socket***//
sock, err := newProxySocket(protocol, proxier.listenIP, proxyPort)
if err != nil {
return nil, err
}
_, portStr, err := net.SplitHostPort(sock.Addr().String())
if err != nil {
sock.Close()
return nil, err
}
portNum, err := strconv.Atoi(portStr)
if err != nil {
sock.Close()
return nil, err
}
si := &serviceInfo{
isAliveAtomic: 1,
proxyPort: portNum,
protocol: protocol,
socket: sock,
timeout: timeout,
activeClients: newClientCache(),
sessionAffinityType: api.ServiceAffinityNone, // default
stickyMaxAgeMinutes: 180, // TODO: parameterize this in the API.
}
//***将新建的serviceInfo保存进proxier的serviceMap数据结构里***//
proxier.setServiceInfo(service, si)
glog.V(2).Infof("Proxying for service %q on %s port %d", service, protocol, portNum)
//***启动一个goroutine sock.ProxyLoop() 进行代理循环***//
go func(service proxy.ServicePortName, proxier *Proxier) {
defer runtime.HandleCrash()
atomic.AddInt32(&proxier.numProxyLoops, 1)
sock.ProxyLoop(service, si, proxier)
atomic.AddInt32(&proxier.numProxyLoops, -1)
}(service, proxier)
return si, nil
}

addServiceOnPort()函数会使用proxysocket.go中的newProxySocket()新建一个端口的socket连接,然后再调用ProxyLoop()函数进行与endpoint的连接,这些在之前已经分析过。

SyncLoop()

再来看SyncLoop()部分。OnServiceUpdate()把service的内容存储到proxier中,SyncLoop()就会把这些数据与实际程序状态进行同步。来看下SyncLoop()的定义:

1
2
3
4
5
6
7
8
9
10
// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
func (proxier *Proxier) SyncLoop() {
t := time.NewTicker(proxier.syncPeriod)
defer t.Stop()
for {
<-t.C
glog.V(6).Infof("Periodic sync")
proxier.Sync()
}
}

SyncLoop()每隔一段时间就调用Sync()。Sync()定义如下:

1
2
3
4
5
6
7
func (proxier *Proxier) Sync() {
if err := iptablesInit(proxier.iptables); err != nil {
glog.Errorf("Failed to ensure iptables: %v", err)
}
proxier.ensurePortals()
proxier.cleanupStaleStickySessions()
}

Sync()调用了ensurePortals(),定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
// Ensure that portals exist for all services.
func (proxier *Proxier) ensurePortals() {
proxier.mu.Lock()
defer proxier.mu.Unlock()
// NB: This does not remove rules that should not be present.
for name, info := range proxier.serviceMap {
err := proxier.openPortal(name, info)
if err != nil {
glog.Errorf("Failed to ensure portal for %q: %v", name, err)
}
}
}

ensurePortals()会遍历serviceMap,然后调用openPortal():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//***处理externalIP, Ingress, nodePort***//
func (proxier *Proxier) openPortal(service proxy.ServicePortName, info *serviceInfo) error {
err := proxier.openOnePortal(info.portal, info.protocol, proxier.listenIP, info.proxyPort, service)
if err != nil {
return err
}
//***处理externalIP***//
for _, publicIP := range info.externalIPs {
err = proxier.openOnePortal(portal{net.ParseIP(publicIP), info.portal.port, true}, info.protocol, proxier.listenIP, info.proxyPort, service)
if err != nil {
return err
}
}
//***处理Ingress***//
for _, ingress := range info.loadBalancerStatus.Ingress {
if ingress.IP != "" {
err = proxier.openOnePortal(portal{net.ParseIP(ingress.IP), info.portal.port, false}, info.protocol, proxier.listenIP, info.proxyPort, service)
if err != nil {
return err
}
}
}
//***处理nodePort***//
if info.nodePort != 0 {
err = proxier.openNodePort(info.nodePort, info.protocol, proxier.listenIP, info.proxyPort, service)
if err != nil {
return err
}
}
return nil
}

来看openOnePortal():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func (proxier *Proxier) openOnePortal(portal portal, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) error {
if local, err := isLocalIP(portal.ip); err != nil {
return fmt.Errorf("can't determine if IP %s is local, assuming not: %v", portal.ip, err)
} else if local {
err := proxier.claimNodePort(portal.ip, portal.port, protocol, name)
if err != nil {
return err
}
}
// Handle traffic from containers.
//***创建container流量iptables规则***//
args := proxier.iptablesContainerPortalArgs(portal.ip, portal.isExternal, false, portal.port, protocol, proxyIP, proxyPort, name)
//***Fankang***//
//***iptables.go中定义有:Append RulePosition = "-A"***//
existed, err := proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesContainerPortalChain, args...)
if err != nil {
glog.Errorf("Failed to install iptables %s rule for service %q, args:%v", iptablesContainerPortalChain, name, args)
return err
}
if !existed {
glog.V(3).Infof("Opened iptables from-containers portal for service %q on %s %s:%d", name, protocol, portal.ip, portal.port)
}
if portal.isExternal {
args := proxier.iptablesContainerPortalArgs(portal.ip, false, true, portal.port, protocol, proxyIP, proxyPort, name)
existed, err := proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesContainerPortalChain, args...)
if err != nil {
glog.Errorf("Failed to install iptables %s rule that opens service %q for local traffic, args:%v", iptablesContainerPortalChain, name, args)
return err
}
if !existed {
glog.V(3).Infof("Opened iptables from-containers portal for service %q on %s %s:%d for local traffic", name, protocol, portal.ip, portal.port)
}
args = proxier.iptablesHostPortalArgs(portal.ip, true, portal.port, protocol, proxyIP, proxyPort, name)
existed, err = proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesHostPortalChain, args...)
if err != nil {
glog.Errorf("Failed to install iptables %s rule for service %q for dst-local traffic", iptablesHostPortalChain, name)
return err
}
if !existed {
glog.V(3).Infof("Opened iptables from-host portal for service %q on %s %s:%d for dst-local traffic", name, protocol, portal.ip, portal.port)
}
return nil
}
// Handle traffic from the host.
//***创建host流量iptables规则***//
args = proxier.iptablesHostPortalArgs(portal.ip, false, portal.port, protocol, proxyIP, proxyPort, name)
existed, err = proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesHostPortalChain, args...)
if err != nil {
glog.Errorf("Failed to install iptables %s rule for service %q", iptablesHostPortalChain, name)
return err
}
if !existed {
glog.V(3).Infof("Opened iptables from-host portal for service %q on %s %s:%d", name, protocol, portal.ip, portal.port)
}
return nil
}

openOnePortal()主要创建两条iptables规则,container流量的iptables规则和host流量的iptables规则。好,现在就建立了service ip到物理主机端口的iptables规则了,也就是说访问service ip,就会把包重定向到物理主机端口,然后proxy就会在endpoints先择一个endpoint与物理主机端口进行连接,这样,通过物理机的端口,连接起了service ip和endpoint。

proxy分析完毕。