本次分析只介绍userspace模式。
proxySocket Proxysocket相关的内容定义在/pkg/proxy/userspace/proxysocket.go中:1
2
3
4
5
6
7
8
9
10
11
12
13
type proxySocket interface {
Addr() net.Addr
Close() error
ProxyLoop(service proxy.ServicePortName, info *serviceInfo, proxier *Proxier)
ListenPort() int
}
只要定义了Addr(), Close(), ProxyLoop(), ListenPort()方法的结构体的值都可以称为proxySocket。 proxySocket的具体功能是建立物理主机上某一端口到endpoint的连接先来看下proxySocket的生成函数:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func newProxySocket (protocol api.Protocol, ip net.IP, port int ) (proxySocket, error) {
host := ""
if ip != nil {
host = ip.String()
}
switch strings.ToUpper(string (protocol)) {
case "TCP" :
listener, err := net.Listen("tcp" , net.JoinHostPort(host, strconv.Itoa(port)))
if err != nil {
return nil , err
}
return &tcpProxySocket{Listener: listener, port: port}, nil
case "UDP" :
addr, err := net.ResolveUDPAddr("udp" , net.JoinHostPort(host, strconv.Itoa(port)))
if err != nil {
return nil , err
}
conn, err := net.ListenUDP("udp" , addr)
if err != nil {
return nil , err
}
return &udpProxySocket{UDPConn: conn, port: port}, nil
}
return nil , fmt.Errorf("unknown protocol %q" , protocol)
}
可以看出,proxySocket有tcpProxySocket和udpProxySocket两种,我们以tcpProxySocket为例进行分析。newProxySocket()函数会调用net.Listen()函数监听host上的端口port,然后把获取到的listener封装成tcpProxySocket返回。 再来看下proxySocket如何与service的endpoint建立连接:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func tryConnect (service proxy.ServicePortName, srcAddr net.Addr, protocol string , proxier *Proxier) (out net.Conn, err error) {
sessionAffinityReset := false
for _, dialTimeout := range endpointDialTimeout {
endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr, sessionAffinityReset)
if err != nil {
glog.Errorf("Couldn't find an endpoint for %s: %v" , service, err)
return nil , err
}
glog.V(3 ).Infof("Mapped service %q to endpoint %s" , service, endpoint)
outConn, err := net.DialTimeout(protocol, endpoint, dialTimeout)
if err != nil {
if isTooManyFDsError(err) {
panic ("Dial failed: " + err.Error())
}
glog.Errorf("Dial failed: %v" , err)
sessionAffinityReset = true
continue
}
return outConn, nil
}
return nil , fmt.Errorf("failed to connect to an endpoint." )
}
tryConnect()先调用locabalancer的NextEndpoint()函数获取一个合适的endpoint, 然后调用net.DialTimeout()函数建立与endpoint的连接。
tcpProxySocket tcpProxySocket定义在/pkg/proxy/userspace/proxysocket.go中:1
2
3
4
5
6
type tcpProxySocket struct {
net.Listener
port int
}
tcpProxySocket内嵌了net.Listener,还有字段port。tcpProxySocket最重要的方法就是ProxyLoop():1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (tcp *tcpProxySocket) ProxyLoop (service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) {
for {
if !myInfo.isAlive() {
return
}
inConn, err := tcp.Accept()
if err != nil {
if isTooManyFDsError(err) {
panic ("Accept failed: " + err.Error())
}
if isClosedError(err) {
return
}
if !myInfo.isAlive() {
return
}
glog.Errorf("Accept failed: %v" , err)
continue
}
glog.V(3 ).Infof("Accepted TCP connection from %v to %v" , inConn.RemoteAddr(), inConn.LocalAddr())
outConn, err := tryConnect(service, inConn.(*net.TCPConn).RemoteAddr(), "tcp" , proxier)
if err != nil {
glog.Errorf("Failed to connect to balancer: %v" , err)
inConn.Close()
continue
}
go proxyTCP(inConn.(*net.TCPConn), outConn.(*net.TCPConn))
}
}
其中for循环是net.List()机制的固定用法。ProxyLoop()先使用tcp.Accept()获取端口的连接,然后再通过tryConnect()建立与endpoint的连接,然后在把这两个连接上数据进行相互拷贝,即调用proxyTCP()函数:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func proxyTCP (in, out *net.TCPConn) {
var wg sync.WaitGroup
wg.Add(2 )
glog.V(4 ).Infof("Creating proxy between %v <-> %v <-> %v <-> %v" ,
in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr())
go copyBytes("from backend" , in, out, &wg)
go copyBytes("to backend" , out, in, &wg)
wg.Wait()
}
func copyBytes (direction string , dest, src *net.TCPConn, wg *sync.WaitGroup) {
defer wg.Done()
glog.V(4 ).Infof("Copying %s: %s -> %s" , direction, src.RemoteAddr(), dest.RemoteAddr())
n, err := io.Copy(dest, src)
if err != nil {
if !isClosedError(err) {
glog.Errorf("I/O error: %v" , err)
}
}
glog.V(4 ).Infof("Copied %d bytes %s: %s -> %s" , n, direction, src.RemoteAddr(), dest.RemoteAddr())
dest.Close()
src.Close()
}
LoadBalancerRR LoadBalancerRR的相关概念定义在/pkg/proxy/userspace/roundrobin.go中:1
2
3
4
5
type LoadBalancerRR struct {
lock sync.RWMutex
services map [proxy.ServicePortName]*balancerState
}
LoadBalancerRR定义了ServicePortName和balancerState的map关系。其中ServicePortName类似default/mysql:mysql形式;balancerState定义如下:1
2
3
4
5
type balancerState struct {
endpoints []string
index int
affinity affinityPolicy
}
balancerState中存有多个endpoint,index指向目前使用的endpoint的下一个endpoint。affinityPolicy用来标识session会话,对于同一个源IP的包,转发到同一个endpoint上。affinityPolicy定义如下:1
2
3
4
5
type affinityPolicy struct {
affinityType api.ServiceAffinity
affinityMap map [string ]*affinityState
ttlMinutes int
}
其中affinityMap记录了源IP和affinityState的map关系。affinityState定义如下:1
2
3
4
5
6
7
type affinityState struct {
clientIP string
endpoint string
lastUsed time.Time
}
affinityState最重要的是记录了endpoint。
我们知道LoadBalancerRR是用来管理endpoint的,所以需要提供注册及删除endpoint的方法。LoadBalancerRR提供了NewService()及DeleteService()来管理增删操作。 NewService()定义如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (lb *LoadBalancerRR) NewService (svcPort proxy.ServicePortName, affinityType api.ServiceAffinity, ttlMinutes int ) error {
glog.V(4 ).Infof("LoadBalancerRR NewService %q" , svcPort)
lb.lock.Lock()
defer lb.lock.Unlock()
lb.newServiceInternal(svcPort, affinityType, ttlMinutes)
return nil
}
func (lb *LoadBalancerRR) newServiceInternal (svcPort proxy.ServicePortName, affinityType api.ServiceAffinity, ttlMinutes int ) *balancerState {
if ttlMinutes == 0 {
ttlMinutes = 180
}
if _, exists := lb.services[svcPort]; !exists {
lb.services[svcPort] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlMinutes)}
glog.V(4 ).Infof("LoadBalancerRR service %q did not exist, created" , svcPort)
} else if affinityType != "" {
lb.services[svcPort].affinity.affinityType = affinityType
}
return lb.services[svcPort]
}
基本的流程就是生成上述相关概念,然后存储到相应的位置。 DeleteService()定义如下:1
2
3
4
5
6
7
func (lb *LoadBalancerRR) DeleteService (svcPort proxy.ServicePortName) {
glog.V(4 ).Infof("LoadBalancerRR DeleteService %q" , svcPort)
lb.lock.Lock()
defer lb.lock.Unlock()
delete (lb.services, svcPort)
}
删除流程比较简单,直接把svcPort从LoadBalancerRR services map中删除即可。
接下来看下LoadBalancerRR的一个非常重要的方法:NextEndpoint()。NextEndpoint()是用来返回一个可以使用的endpoint的:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func (lb *LoadBalancerRR) NextEndpoint (svcPort proxy.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool ) (string , error) {
lb.lock.Lock()
defer lb.lock.Unlock()
state, exists := lb.services[svcPort]
if !exists || state == nil {
return "" , ErrMissingServiceEntry
}
if len (state.endpoints) == 0 {
return "" , ErrMissingEndpoints
}
glog.V(4 ).Infof("NextEndpoint for service %q, srcAddr=%v: endpoints: %+v" , svcPort, srcAddr, state.endpoints)
sessionAffinityEnabled := isSessionAffinity(&state.affinity)
var ipaddr string
if sessionAffinityEnabled {
var err error
ipaddr, _, err = net.SplitHostPort(srcAddr.String())
if err != nil {
return "" , fmt.Errorf("malformed source address %q: %v" , srcAddr.String(), err)
}
if !sessionAffinityReset {
sessionAffinity, exists := state.affinity.affinityMap[ipaddr]
if exists && int (time.Now().Sub(sessionAffinity.lastUsed).Minutes()) < state.affinity.ttlMinutes {
endpoint := sessionAffinity.endpoint
sessionAffinity.lastUsed = time.Now()
glog.V(4 ).Infof("NextEndpoint for service %q from IP %s with sessionAffinity %#v: %s" , svcPort, ipaddr, sessionAffinity, endpoint)
return endpoint, nil
}
}
}
endpoint := state.endpoints[state.index]
state.index = (state.index + 1 ) % len (state.endpoints)
if sessionAffinityEnabled {
var affinity *affinityState
affinity = state.affinity.affinityMap[ipaddr]
if affinity == nil {
affinity = new (affinityState)
state.affinity.affinityMap[ipaddr] = affinity
}
affinity.lastUsed = time.Now()
affinity.endpoint = endpoint
affinity.clientIP = ipaddr
glog.V(4 ).Infof("Updated affinity key %s: %#v" , ipaddr, state.affinity.affinityMap[ipaddr])
}
return endpoint, nil
}
NextEndpoint()会从LoadBalancerRR的services中获取对应svcPort key下的balancerState,然后检查balancerState的affinity,如果源IP在这边有记录,则直接返回记录中的endpoint;否则返回balancerState中index指向的endpoint,并把index加1以指向下一个endpoint;然后更新affinity中的会话信息。
那么,LoadBalancerRR是如何获取endpoint的信息的呢。我们之前分析ServiceConfig时,提到数据最后交由proxier处理,因为proxier定义有OnServiceUpdate()方法;同样的,LoadBalancerRR定义了OnEndpointsUpdate()方法,可以当作EndpointsConfig的一个handler。在/cmd/kube-proxy/app/server.go的NewProxyServerDefault()函数中,有:1
2
3
4
5
6
7
8
9
10
......
loadBalancer := userspace.NewLoadBalancerRR()
endpointsHandler = loadBalancer
......
endpointsConfig := proxyconfig.NewEndpointsConfig()
endpointsConfig.RegisterHandler(endpointsHandler)
......
还是来看下OnEndpointsUpdate()吧:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
func (lb *LoadBalancerRR) OnEndpointsUpdate (allEndpoints []api.Endpoints) {
registeredEndpoints := make (map [proxy.ServicePortName]bool )
lb.lock.Lock()
defer lb.lock.Unlock()
for i := range allEndpoints {
svcEndpoints := &allEndpoints[i]
portsToEndpoints := map [string ][]hostPortPair{}
for i := range svcEndpoints.Subsets {
ss := &svcEndpoints.Subsets[i]
for i := range ss.Ports {
port := &ss.Ports[i]
for i := range ss.Addresses {
addr := &ss.Addresses[i]
portsToEndpoints[port.Name] = append (portsToEndpoints[port.Name], hostPortPair{addr.IP, int (port.Port)})
}
}
}
for portname := range portsToEndpoints {
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: svcEndpoints.Namespace, Name: svcEndpoints.Name}, Port: portname}
state, exists := lb.services[svcPort]
curEndpoints := []string {}
if state != nil {
curEndpoints = state.endpoints
}
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
if !exists || state == nil || len (curEndpoints) != len (newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) {
glog.V(1 ).Infof("LoadBalancerRR: Setting endpoints for %s to %+v" , svcPort, newEndpoints)
lb.updateAffinityMap(svcPort, newEndpoints)
state = lb.newServiceInternal(svcPort, api.ServiceAffinity("" ), 0 )
state.endpoints = slice.ShuffleStrings(newEndpoints)
state.index = 0
}
registeredEndpoints[svcPort] = true
}
}
for k := range lb.services {
if _, exists := registeredEndpoints[k]; !exists {
glog.V(2 ).Infof("LoadBalancerRR: Removing endpoints for %s" , k)
state := lb.services[k]
state.endpoints = []string {}
state.index = 0
state.affinity.affinityMap = map [string ]*affinityState{}
}
}
}
OnEndpointUpdate()函数就是把config获取到的endpoints放入到LoadBalancerRR中。
Proxier Proxier定义在/pkg/proxy/userspace/proxier.go中:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type Proxier struct {
loadBalancer LoadBalancer
mu sync.Mutex
serviceMap map [proxy.ServicePortName]*serviceInfo
syncPeriod time.Duration
minSyncPeriod time.Duration
udpIdleTimeout time.Duration
portMapMutex sync.Mutex
portMap map [portMapKey]*portMapValue
numProxyLoops int32
listenIP net.IP
iptables iptables.Interface
hostIP net.IP
proxyPorts PortAllocator
}
Proxier中最重要的字段是serviceMap,存储有ServicePortName和serviceInfo的map关系。serviceInfo定义如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type serviceInfo struct {
isAliveAtomic int32
portal portal
protocol api.Protocol
proxyPort int
socket proxySocket
timeout time.Duration
activeClients *clientCache
nodePort int
loadBalancerStatus api.LoadBalancerStatus
sessionAffinityType api.ServiceAffinity
stickyMaxAgeMinutes int
externalIPs []string
}
整个Proxier可以分为两个部分:OnServiceUpdate()和SyncLoop()。
OnServiceUpdate() 先来看OnServiceUpdate(),定义在/pkg/proxy/userspace/proxier.go中:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
func (proxier *Proxier) OnServiceUpdate (services []api.Service) {
glog.V(4 ).Infof("Received update notice: %+v" , services)
activeServices := make (map [proxy.ServicePortName]bool )
for i := range services {
service := &services[i]
if !api.IsServiceIPSet(service) {
glog.V(3 ).Infof("Skipping service %s due to clusterIP = %q" , types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP)
continue
}
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
serviceName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name}
activeServices[serviceName] = true
serviceIP := net.ParseIP(service.Spec.ClusterIP)
info, exists := proxier.getServiceInfo(serviceName)
if exists && sameConfig(info, service, servicePort) {
continue
}
if exists {
glog.V(4 ).Infof("Something changed for service %q: stopping it" , serviceName)
err := proxier.closePortal(serviceName, info)
if err != nil {
glog.Errorf("Failed to close portal for %q: %v" , serviceName, err)
}
err = proxier.stopProxy(serviceName, info)
if err != nil {
glog.Errorf("Failed to stop service %q: %v" , serviceName, err)
}
}
proxyPort, err := proxier.proxyPorts.AllocateNext()
if err != nil {
glog.Errorf("failed to allocate proxy port for service %q: %v" , serviceName, err)
continue
}
glog.V(1 ).Infof("Adding new service %q at %s:%d/%s" , serviceName, serviceIP, servicePort.Port, servicePort.Protocol)
info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout)
if err != nil {
glog.Errorf("Failed to start proxy for %q: %v" , serviceName, err)
continue
}
info.portal.ip = serviceIP
info.portal.port = int (servicePort.Port)
info.externalIPs = service.Spec.ExternalIPs
info.loadBalancerStatus = *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
info.nodePort = int (servicePort.NodePort)
info.sessionAffinityType = service.Spec.SessionAffinity
glog.V(4 ).Infof("info: %#v" , info)
err = proxier.openPortal(serviceName, info)
if err != nil {
glog.Errorf("Failed to open portal for %q: %v" , serviceName, err)
}
proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeMinutes)
}
}
proxier.mu.Lock()
defer proxier.mu.Unlock()
for name, info := range proxier.serviceMap {
if !activeServices[name] {
glog.V(1 ).Infof("Stopping service %q" , name)
err := proxier.closePortal(name, info)
if err != nil {
glog.Errorf("Failed to close portal for %q: %v" , name, err)
}
err = proxier.stopProxyInternal(name, info)
if err != nil {
glog.Errorf("Failed to stop service %q: %v" , name, err)
}
proxier.loadBalancer.DeleteService(name)
}
}
}
OnServiceUpdate()是proxier.go中处理services的入口,把config获取到的services加入到proxier中,当然也会更新proxier中的loadbalancer。其中针对service中的每个port,根据不同的情况进行不同的处理。最关键的是关于新增部分的处理,调用了addServiceOnPort()函数:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (proxier *Proxier) addServiceOnPort (service proxy.ServicePortName, protocol api.Protocol, proxyPort int , timeout time.Duration) (*serviceInfo, error) {
sock, err := newProxySocket(protocol, proxier.listenIP, proxyPort)
if err != nil {
return nil , err
}
_, portStr, err := net.SplitHostPort(sock.Addr().String())
if err != nil {
sock.Close()
return nil , err
}
portNum, err := strconv.Atoi(portStr)
if err != nil {
sock.Close()
return nil , err
}
si := &serviceInfo{
isAliveAtomic: 1 ,
proxyPort: portNum,
protocol: protocol,
socket: sock,
timeout: timeout,
activeClients: newClientCache(),
sessionAffinityType: api.ServiceAffinityNone,
stickyMaxAgeMinutes: 180 ,
}
proxier.setServiceInfo(service, si)
glog.V(2 ).Infof("Proxying for service %q on %s port %d" , service, protocol, portNum)
go func (service proxy.ServicePortName, proxier *Proxier) {
defer runtime.HandleCrash()
atomic.AddInt32(&proxier.numProxyLoops, 1 )
sock.ProxyLoop(service, si, proxier)
atomic.AddInt32(&proxier.numProxyLoops, -1 )
}(service, proxier)
return si, nil
}
addServiceOnPort()函数会使用proxysocket.go中的newProxySocket()新建一个端口的socket连接,然后再调用ProxyLoop()函数进行与endpoint的连接,这些在之前已经分析过。
SyncLoop() 再来看SyncLoop()部分。OnServiceUpdate()把service的内容存储到proxier中,SyncLoop()就会把这些数据与实际程序状态进行同步。来看下SyncLoop()的定义:1
2
3
4
5
6
7
8
9
10
func (proxier *Proxier) SyncLoop () {
t := time.NewTicker(proxier.syncPeriod)
defer t.Stop()
for {
<-t.C
glog.V(6 ).Infof("Periodic sync" )
proxier.Sync()
}
}
SyncLoop()每隔一段时间就调用Sync()。Sync()定义如下:1
2
3
4
5
6
7
func (proxier *Proxier) Sync () {
if err := iptablesInit(proxier.iptables); err != nil {
glog.Errorf("Failed to ensure iptables: %v" , err)
}
proxier.ensurePortals()
proxier.cleanupStaleStickySessions()
}
Sync()调用了ensurePortals(),定义如下:1
2
3
4
5
6
7
8
9
10
11
12
func (proxier *Proxier) ensurePortals () {
proxier.mu.Lock()
defer proxier.mu.Unlock()
for name, info := range proxier.serviceMap {
err := proxier.openPortal(name, info)
if err != nil {
glog.Errorf("Failed to ensure portal for %q: %v" , name, err)
}
}
}
ensurePortals()会遍历serviceMap,然后调用openPortal():1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (proxier *Proxier) openPortal (service proxy.ServicePortName, info *serviceInfo) error {
err := proxier.openOnePortal(info.portal, info.protocol, proxier.listenIP, info.proxyPort, service)
if err != nil {
return err
}
for _, publicIP := range info.externalIPs {
err = proxier.openOnePortal(portal{net.ParseIP(publicIP), info.portal.port, true }, info.protocol, proxier.listenIP, info.proxyPort, service)
if err != nil {
return err
}
}
for _, ingress := range info.loadBalancerStatus.Ingress {
if ingress.IP != "" {
err = proxier.openOnePortal(portal{net.ParseIP(ingress.IP), info.portal.port, false }, info.protocol, proxier.listenIP, info.proxyPort, service)
if err != nil {
return err
}
}
}
if info.nodePort != 0 {
err = proxier.openNodePort(info.nodePort, info.protocol, proxier.listenIP, info.proxyPort, service)
if err != nil {
return err
}
}
return nil
}
来看openOnePortal():1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func (proxier *Proxier) openOnePortal (portal portal, protocol api.Protocol, proxyIP net.IP, proxyPort int , name proxy.ServicePortName) error {
if local, err := isLocalIP(portal.ip); err != nil {
return fmt.Errorf("can't determine if IP %s is local, assuming not: %v" , portal.ip, err)
} else if local {
err := proxier.claimNodePort(portal.ip, portal.port, protocol, name)
if err != nil {
return err
}
}
args := proxier.iptablesContainerPortalArgs(portal.ip, portal.isExternal, false , portal.port, protocol, proxyIP, proxyPort, name)
existed, err := proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesContainerPortalChain, args...)
if err != nil {
glog.Errorf("Failed to install iptables %s rule for service %q, args:%v" , iptablesContainerPortalChain, name, args)
return err
}
if !existed {
glog.V(3 ).Infof("Opened iptables from-containers portal for service %q on %s %s:%d" , name, protocol, portal.ip, portal.port)
}
if portal.isExternal {
args := proxier.iptablesContainerPortalArgs(portal.ip, false , true , portal.port, protocol, proxyIP, proxyPort, name)
existed, err := proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesContainerPortalChain, args...)
if err != nil {
glog.Errorf("Failed to install iptables %s rule that opens service %q for local traffic, args:%v" , iptablesContainerPortalChain, name, args)
return err
}
if !existed {
glog.V(3 ).Infof("Opened iptables from-containers portal for service %q on %s %s:%d for local traffic" , name, protocol, portal.ip, portal.port)
}
args = proxier.iptablesHostPortalArgs(portal.ip, true , portal.port, protocol, proxyIP, proxyPort, name)
existed, err = proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesHostPortalChain, args...)
if err != nil {
glog.Errorf("Failed to install iptables %s rule for service %q for dst-local traffic" , iptablesHostPortalChain, name)
return err
}
if !existed {
glog.V(3 ).Infof("Opened iptables from-host portal for service %q on %s %s:%d for dst-local traffic" , name, protocol, portal.ip, portal.port)
}
return nil
}
args = proxier.iptablesHostPortalArgs(portal.ip, false , portal.port, protocol, proxyIP, proxyPort, name)
existed, err = proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesHostPortalChain, args...)
if err != nil {
glog.Errorf("Failed to install iptables %s rule for service %q" , iptablesHostPortalChain, name)
return err
}
if !existed {
glog.V(3 ).Infof("Opened iptables from-host portal for service %q on %s %s:%d" , name, protocol, portal.ip, portal.port)
}
return nil
}
openOnePortal()主要创建两条iptables规则,container流量的iptables规则和host流量的iptables规则。好,现在就建立了service ip到物理主机端口的iptables规则了,也就是说访问service ip,就会把包重定向到物理主机端口,然后proxy就会在endpoints先择一个endpoint与物理主机端口进行连接,这样,通过物理机的端口,连接起了service ip和endpoint。
proxy分析完毕。