proberManager用来检测Kubernetes集群中Pod的健康状态。目前Kubernetes支持livenessProber和readinessProber两种:
- livenessProber: 用来检测容器中的程序是否健康并确定何时重启容器;
- readinessProber: 用来检测容器中的程序是否就绪,就绪意味着该容器可以接收处理流量。
目前支持HTTP, TCP, EXEC(执行命令或脚本)三种检测方式。
proberManager
proberManager定义在/pkg/kubelet/prober/prober_manager.go中,即manager结构体:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| type manager struct { workers map[probeKey]*worker workerLock sync.RWMutex statusManager status.Manager readinessManager results.Manager livenessManager results.Manager prober *prober }
|
proberManager的主要字段如下:
- workers: manager中注册的worker,worker执行具体的检测,一个container对应一个或两个workers;
- statusManager: pod的status管理器;
- readinessManager:readinessProber探测结果的存储地;
- livenessManager: livenessManager探测结果的存储地;
- prober:探针,可以执行container中定义的检测。
可以使用NewManager()生成proberManager:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| func NewManager( statusManager status.Manager, livenessManager results.Manager, runner kubecontainer.ContainerCommandRunner, refManager *kubecontainer.RefManager, recorder record.EventRecorder) Manager { prober := newProber(runner, refManager, recorder) readinessManager := results.NewManager() return &manager{ statusManager: statusManager, prober: prober, readinessManager: readinessManager, livenessManager: livenessManager, workers: make(map[probeKey]*worker), } }
|
manager::Start()
Start()可以启动proberManager。启动后,proberManager可以重复调用updateReadiness()方法。
1 2 3 4 5
| func (m *manager) Start() { go wait.Forever(m.updateReadiness, 0) }
|
manager::updateReadiness()
updateReadiness()会从readinessManager中获取最新的ready信息,然后更新container。
1 2 3 4 5 6 7 8
| func (m *manager) updateReadiness() { update := <-m.readinessManager.Updates() ready := update.Result == results.Success m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready) }
|
manager::AddPod()
AddPod()会检测容器的定义,如果定义有ReadinessProbe,则生成容器readiness的检测worker;如果定义有LivenessProbe,则生成容器liveness的检测worker。然后把新生成的worker加入到proberManager的workers字段中,并启动worker。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| func (m *manager) AddPod(pod *api.Pod) { m.workerLock.Lock() defer m.workerLock.Unlock() key := probeKey{podUID: pod.UID} for _, c := range pod.Spec.Containers { key.containerName = c.Name if c.ReadinessProbe != nil { key.probeType = readiness if _, ok := m.workers[key]; ok { glog.Errorf("Readiness probe already exists! %v - %v", format.Pod(pod), c.Name) return } w := newWorker(m, readiness, pod, c) m.workers[key] = w go w.run() } if c.LivenessProbe != nil { key.probeType = liveness if _, ok := m.workers[key]; ok { glog.Errorf("Liveness probe already exists! %v - %v", format.Pod(pod), c.Name) return } w := newWorker(m, liveness, pod, c) m.workers[key] = w go w.run() } } }
|
manager::RemovePod()
RemovePod()获取pod对应的readiness worker,liveness worker,并停止。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func (m *manager) RemovePod(pod *api.Pod) { m.workerLock.RLock() defer m.workerLock.RUnlock() key := probeKey{podUID: pod.UID} for _, c := range pod.Spec.Containers { key.containerName = c.Name for _, probeType := range [...]probeType{readiness, liveness} { key.probeType = probeType if worker, ok := m.workers[key]; ok { worker.stop() } } } }
|
manager::CleanupPods()
CleanupPods()可以停止状态不为active的容器对应的worker。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func (m *manager) CleanupPods(activePods []*api.Pod) { desiredPods := make(map[types.UID]sets.Empty) for _, pod := range activePods { desiredPods[pod.UID] = sets.Empty{} } m.workerLock.RLock() defer m.workerLock.RUnlock() for key, worker := range m.workers { if _, ok := desiredPods[key.podUID]; !ok { worker.stop() } } }
|
manager::UpdatePodStatus()
UpdatePodStatus()可以更新pod的status。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| func (m *manager) UpdatePodStatus(podUID types.UID, podStatus *api.PodStatus) { for i, c := range podStatus.ContainerStatuses { var ready bool if c.State.Running == nil { ready = false } else if result, ok := m.readinessManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok { ready = result == results.Success } else { _, exists := m.getWorker(podUID, c.Name, readiness) ready = !exists } podStatus.ContainerStatuses[i].Ready = ready } for i, c := range podStatus.InitContainerStatuses { var ready bool if c.State.Terminated != nil && c.State.Terminated.ExitCode == 0 { ready = true } podStatus.InitContainerStatuses[i].Ready = ready } }
|
manager::getWorker()
getWorker()获取具体worker。
1 2 3 4 5 6 7
| func (m *manager) getWorker(podUID types.UID, containerName string, probeType probeType) (*worker, bool) { m.workerLock.RLock() defer m.workerLock.RUnlock() worker, ok := m.workers[probeKey{podUID, containerName, probeType}] return worker, ok }
|
manager::removeWorker()
removeWorker()删除某worker。
1 2 3 4 5 6
| func (m *manager) removeWorker(podUID types.UID, containerName string, probeType probeType) { m.workerLock.Lock() defer m.workerLock.Unlock() delete(m.workers, probeKey{podUID, containerName, probeType}) }
|
worker
worker定义在/pkg/kubelet/prober/worker.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| type worker struct { stopCh chan struct{} pod *api.Pod container api.Container spec *api.Probe probeType probeType initialValue results.Result resultsManager results.Manager probeManager *manager containerID kubecontainer.ContainerID lastResult results.Result resultRun int onHold bool }
|
worker可以执行检测操作。worker的主要字段如下:
- probeType: 标识该worker所属的类型,livenessProber或readinessProber;
- resultsManager: 检测结果存在地;
- probeManager: 包含该worker的probemanager;
worker可以通过newWorker()生成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| func newWorker( m *manager, probeType probeType, pod *api.Pod, container api.Container) *worker { w := &worker{ stopCh: make(chan struct{}, 1), pod: pod, container: container, probeType: probeType, probeManager: m, } switch probeType { case readiness: w.spec = container.ReadinessProbe w.resultsManager = m.readinessManager w.initialValue = results.Failure case liveness: w.spec = container.LivenessProbe w.resultsManager = m.livenessManager w.initialValue = results.Success } return w }
|
可以看到,生成worker的时候,worker的resultsManager依据worker的类型为probeManager的readinessManager或livenessManager。
worker可以启动:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| func (w *worker) run() { probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second probeTicker := time.NewTicker(probeTickerPeriod) defer func() { probeTicker.Stop() if !w.containerID.IsEmpty() { w.resultsManager.Remove(w.containerID) } w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType) }() time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod))) probeLoop: for w.doProbe() { select { case <-w.stopCh: break probeLoop case <-probeTicker.C: } } }
|
run()会依据container的spec中设置的PeriodSeconds生成定时器,并依据定时器定时执行doProbe()。
doProbe()即执行检测的函数。容器会设置InitialDelaySeconds表示初始化需要的时间,如果在初始化时间内,那么livenessProber和readinessProber的worker都不会进行检测,而且检测结果的次数要超过阈值都会更新容器的状态,即把结果存储到resultsManager中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
| func (w *worker) doProbe() (keepGoing bool) { defer func() { recover() }() defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true }) status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID) if !ok { glog.V(3).Infof("No status for pod: %v", format.Pod(w.pod)) return true } if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded { glog.V(3).Infof("Pod %v %v, exiting probe worker", format.Pod(w.pod), status.Phase) return false } c, ok := api.GetContainerStatus(status.ContainerStatuses, w.container.Name) if !ok || len(c.ContainerID) == 0 { glog.V(3).Infof("Probe target container not found: %v - %v", format.Pod(w.pod), w.container.Name) return true } if w.containerID.String() != c.ContainerID { if !w.containerID.IsEmpty() { w.resultsManager.Remove(w.containerID) } w.containerID = kubecontainer.ParseContainerID(c.ContainerID) w.resultsManager.Set(w.containerID, w.initialValue, w.pod) w.onHold = false } if w.onHold { return true } if c.State.Running == nil { glog.V(3).Infof("Non-running container probed: %v - %v", format.Pod(w.pod), w.container.Name) if !w.containerID.IsEmpty() { w.resultsManager.Set(w.containerID, results.Failure, w.pod) } return c.State.Terminated == nil || w.pod.Spec.RestartPolicy != api.RestartPolicyNever } if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds { return true } result, err := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID) if err != nil { return true } if w.lastResult == result { w.resultRun++ } else { w.lastResult = result w.resultRun = 1 } if (result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) || (result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) { return true } w.resultsManager.Set(w.containerID, result, w.pod) if w.probeType == liveness && result == results.Failure { w.onHold = true } return true }
|
resultManager
resultManager定义在/pkg/kubelet/prober/results/results_manager.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| type manager struct { sync.RWMutex cache map[kubecontainer.ContainerID]Result updates chan Update } var _ Manager = &manager{} func NewManager() Manager { return &manager{ cache: make(map[kubecontainer.ContainerID]Result), updates: make(chan Update, 20), } }
|
可以看到,resultManager中有一个cache用来缓存container的检测结果,还有updates用于存放更新。
resultManager可以通过Get()方法从cache中直接获取result:
1 2 3 4 5 6
| func (m *manager) Get(id kubecontainer.ContainerID) (Result, bool) { m.RLock() defer m.RUnlock() result, found := m.cache[id] return result, found }
|
通过Set()方法可以向resultManager提交更新:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| func (m *manager) Set(id kubecontainer.ContainerID, result Result, pod *api.Pod) { if m.setInternal(id, result) { m.updates <- Update{id, result, pod.UID} } } func (m *manager) setInternal(id kubecontainer.ContainerID, result Result) bool { m.Lock() defer m.Unlock() prev, exists := m.cache[id] if !exists || prev != result { m.cache[id] = result return true } return false }
|
Set()除了会更新cache外,还把更新放入updates channel中。
可以通过Updates()方法获取updates channel:
1 2 3
| func (m *manager) Updates() <-chan Update { return m.updates }
|
prober
prober可以执行具体的检测,定义在/pkg/kbuelet/prober/prober.go中:
1 2 3 4 5 6 7 8 9 10
| type prober struct { exec execprobe.ExecProber http httprobe.HTTPProber tcp tcprobe.TCPProber runner kubecontainer.ContainerCommandRunner refManager *kubecontainer.RefManager recorder record.EventRecorder }
|
prober中定义了exec, http和tcp来表示三种检测方式。
可以通过newProber()来生成一个prober:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func newProber( runner kubecontainer.ContainerCommandRunner, refManager *kubecontainer.RefManager, recorder record.EventRecorder) *prober { return &prober{ exec: execprobe.New(), http: httprobe.New(), tcp: tcprobe.New(), runner: runner, refManager: refManager, recorder: recorder, } }
|
可以通过probe()方法进行检测:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| func (pb *prober) probe(probeType probeType, pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (results.Result, error) { var probeSpec *api.Probe switch probeType { case readiness: probeSpec = container.ReadinessProbe case liveness: probeSpec = container.LivenessProbe default: return results.Failure, fmt.Errorf("Unknown probe type: %q", probeType) } ctrName := fmt.Sprintf("%s:%s", format.Pod(pod), container.Name) if probeSpec == nil { glog.Warningf("%s probe for %s is nil", probeType, ctrName) return results.Success, nil } result, output, err := pb.runProbeWithRetries(probeSpec, pod, status, container, containerID, maxProbeRetries) if err != nil || result != probe.Success { ref, hasRef := pb.refManager.GetRef(containerID) if !hasRef { glog.Warningf("No ref for container %q (%s)", containerID.String(), ctrName) } if err != nil { glog.V(1).Infof("%s probe for %q errored: %v", probeType, ctrName, err) if hasRef { pb.recorder.Eventf(ref, api.EventTypeWarning, events.ContainerUnhealthy, "%s probe errored: %v", probeType, err) } } else { glog.V(1).Infof("%s probe for %q failed (%v): %s", probeType, ctrName, result, output) if hasRef { pb.recorder.Eventf(ref, api.EventTypeWarning, events.ContainerUnhealthy, "%s probe failed: %s", probeType, output) } } return results.Failure, err } glog.V(3).Infof("%s probe for %q succeeded", probeType, ctrName) return results.Success, nil }
|
probe()主要调用了runProbeWithRetries():
1 2 3 4 5 6 7 8 9 10 11 12 13
| func (pb *prober) runProbeWithRetries(p *api.Probe, pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID, retries int) (probe.Result, string, error) { var err error var result probe.Result var output string for i := 0; i < retries; i++ { result, output, err = pb.runProbe(p, pod, status, container, containerID) if err == nil { return result, output, nil } } return result, output, err }
|
runProbeWithRetries()可以多次调用runProbe()以确保检测执行成功,runProbe()定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| func (pb *prober) runProbe(p *api.Probe, pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) { timeout := time.Duration(p.TimeoutSeconds) * time.Second if p.Exec != nil { glog.V(4).Infof("Exec-Probe Pod: %v, Container: %v, Command: %v", pod, container, p.Exec.Command) return pb.exec.Probe(pb.newExecInContainer(container, containerID, p.Exec.Command, timeout)) } if p.HTTPGet != nil { scheme := strings.ToLower(string(p.HTTPGet.Scheme)) host := p.HTTPGet.Host if host == "" { host = status.PodIP } port, err := extractPort(p.HTTPGet.Port, container) if err != nil { return probe.Unknown, "", err } path := p.HTTPGet.Path glog.V(4).Infof("HTTP-Probe Host: %v://%v, Port: %v, Path: %v", scheme, host, port, path) url := formatURL(scheme, host, port, path) headers := buildHeader(p.HTTPGet.HTTPHeaders) glog.V(4).Infof("HTTP-Probe Headers: %v", headers) return pb.http.Probe(url, headers, timeout) } if p.TCPSocket != nil { port, err := extractPort(p.TCPSocket.Port, container) if err != nil { return probe.Unknown, "", err } glog.V(4).Infof("TCP-Probe PodIP: %v, Port: %v, Timeout: %v", status.PodIP, port, timeout) return pb.tcp.Probe(status.PodIP, port, timeout) } glog.Warningf("Failed to find probe builder for container: %v", container) return probe.Unknown, "", fmt.Errorf("Missing probe handler for %s:%s", format.Pod(pod), container.Name) }
|
runProbe()按Exec, HTTPGet, TCPSocket的优先级执行检测,并返回结果。
readinessManager
在proberManager中已经说过:
1 2 3 4 5 6 7 8
| func (m *manager) updateReadiness() { update := <-m.readinessManager.Updates() ready := update.Result == results.Success m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready) }
|
SetContainerReadiness()会设置container的ready及pod的ready。这里不再展开分析。
livenessManager
livenessManager是健康检测结果存储地。那么kubelet是如何感知到要重启容器的呢?
先来看/pkg/kbueelt/kubelet.go中:
1 2 3 4 5 6
| klet.probeManager = prober.NewManager( klet.statusManager, klet.livenessManager, klet.runner, containerRefManager, kubeDeps.Recorder)
|
可以看到kubelet在生成proberManager时把自己的livenessManager传给了proberManager,所以容器检测结果也就存储在了kubelet的livenessManager中。
然后在生成dockerManager时,也把livenessManager传进去。
1 2 3 4 5 6 7 8
| case "docker": runtime := dockertools.NewDockerManager( kubeDeps.DockerClient, kubecontainer.FilterEventRecorder(kubeDeps.Recorder), klet.livenessManager, containerRefManager, klet.podManager ......
|
所以在/pkg/kubelet/dockertolls/docker_manager.go中的computePodContainerChanges()中有:
1 2 3 4 5
| liveness, found := dm.livenessManager.Get(containerStatus.ID) if !found || liveness == proberesults.Success { containersToKeep[containerID] = index continue }
|
即如果不在livenessManager,或状态为Success的,则是需要维持不变的;反过来说,如果在livenessManager中,且状态为Failed的,则需要重启。