proberManager用来检测Kubernetes集群中Pod的健康状态。目前Kubernetes支持livenessProber和readinessProber两种:

  • livenessProber: 用来检测容器中的程序是否健康并确定何时重启容器;
  • readinessProber: 用来检测容器中的程序是否就绪,就绪意味着该容器可以接收处理流量。

目前支持HTTP, TCP, EXEC(执行命令或脚本)三种检测方式。

proberManager

proberManager定义在/pkg/kubelet/prober/prober_manager.go中,即manager结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type manager struct {
// Map of active workers for probes
workers map[probeKey]*worker
// Lock for accessing & mutating workers
workerLock sync.RWMutex
// The statusManager cache provides pod IP and container IDs for probing.
statusManager status.Manager
// readinessManager manages the results of readiness probes
readinessManager results.Manager
// livenessManager manages the results of liveness probes
livenessManager results.Manager
// prober executes the probe actions.
prober *prober
}

proberManager的主要字段如下:

  • workers: manager中注册的worker,worker执行具体的检测,一个container对应一个或两个workers;
  • statusManager: pod的status管理器;
  • readinessManager:readinessProber探测结果的存储地;
  • livenessManager: livenessManager探测结果的存储地;
  • prober:探针,可以执行container中定义的检测。

可以使用NewManager()生成proberManager:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func NewManager(
statusManager status.Manager,
livenessManager results.Manager,
runner kubecontainer.ContainerCommandRunner,
refManager *kubecontainer.RefManager,
recorder record.EventRecorder) Manager {
prober := newProber(runner, refManager, recorder)
readinessManager := results.NewManager()
return &manager{
statusManager: statusManager,
prober: prober,
readinessManager: readinessManager,
livenessManager: livenessManager,
workers: make(map[probeKey]*worker),
}
}

manager::Start()

Start()可以启动proberManager。启动后,proberManager可以重复调用updateReadiness()方法。

1
2
3
4
5
// Start syncing probe status. This should only be called once.
func (m *manager) Start() {
// Start syncing readiness.
go wait.Forever(m.updateReadiness, 0)
}

manager::updateReadiness()

updateReadiness()会从readinessManager中获取最新的ready信息,然后更新container。

1
2
3
4
5
6
7
8
//***go routine中的updateReadiness()方法***//
func (m *manager) updateReadiness() {
//***获取readinessmanager的updates channel***//
update := <-m.readinessManager.Updates()
ready := update.Result == results.Success
m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
}

manager::AddPod()

AddPod()会检测容器的定义,如果定义有ReadinessProbe,则生成容器readiness的检测worker;如果定义有LivenessProbe,则生成容器liveness的检测worker。然后把新生成的worker加入到proberManager的workers字段中,并启动worker。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (m *manager) AddPod(pod *api.Pod) {
m.workerLock.Lock()
defer m.workerLock.Unlock()
key := probeKey{podUID: pod.UID}
for _, c := range pod.Spec.Containers {
key.containerName = c.Name
//***处理ReadinessProbe***//
if c.ReadinessProbe != nil {
key.probeType = readiness
if _, ok := m.workers[key]; ok {
glog.Errorf("Readiness probe already exists! %v - %v",
format.Pod(pod), c.Name)
return
}
//***新创建worker***//
w := newWorker(m, readiness, pod, c)
m.workers[key] = w
//***Fankang***//
//***启动worker***//
go w.run()
}
//***处理LivenessProbe***//
if c.LivenessProbe != nil {
key.probeType = liveness
if _, ok := m.workers[key]; ok {
glog.Errorf("Liveness probe already exists! %v - %v",
format.Pod(pod), c.Name)
return
}
w := newWorker(m, liveness, pod, c)
m.workers[key] = w
go w.run()
}
}
}

manager::RemovePod()

RemovePod()获取pod对应的readiness worker,liveness worker,并停止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (m *manager) RemovePod(pod *api.Pod) {
m.workerLock.RLock()
defer m.workerLock.RUnlock()
key := probeKey{podUID: pod.UID}
for _, c := range pod.Spec.Containers {
key.containerName = c.Name
for _, probeType := range [...]probeType{readiness, liveness} {
key.probeType = probeType
if worker, ok := m.workers[key]; ok {
//***停止readiness worker及liveness worker***//
worker.stop()
}
}
}
}

manager::CleanupPods()

CleanupPods()可以停止状态不为active的容器对应的worker。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//***如果容器状态不为active,则把对应的的worker停止***//
func (m *manager) CleanupPods(activePods []*api.Pod) {
desiredPods := make(map[types.UID]sets.Empty)
for _, pod := range activePods {
desiredPods[pod.UID] = sets.Empty{}
}
m.workerLock.RLock()
defer m.workerLock.RUnlock()
for key, worker := range m.workers {
if _, ok := desiredPods[key.podUID]; !ok {
worker.stop()
}
}
}

manager::UpdatePodStatus()

UpdatePodStatus()可以更新pod的status。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//***此处有ready的逻辑****//
//***时时获取pod的ready状态***//
func (m *manager) UpdatePodStatus(podUID types.UID, podStatus *api.PodStatus) {
for i, c := range podStatus.ContainerStatuses {
var ready bool
if c.State.Running == nil {
ready = false
} else if result, ok := m.readinessManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok {
ready = result == results.Success
} else {
// The check whether there is a probe which hasn't run yet.
_, exists := m.getWorker(podUID, c.Name, readiness)
ready = !exists
}
podStatus.ContainerStatuses[i].Ready = ready
}
// init containers are ready if they have exited with success or if a readiness probe has
// succeeded.
for i, c := range podStatus.InitContainerStatuses {
var ready bool
if c.State.Terminated != nil && c.State.Terminated.ExitCode == 0 {
ready = true
}
podStatus.InitContainerStatuses[i].Ready = ready
}
}

manager::getWorker()

getWorker()获取具体worker。

1
2
3
4
5
6
7
//***获取具体worker***//
func (m *manager) getWorker(podUID types.UID, containerName string, probeType probeType) (*worker, bool) {
m.workerLock.RLock()
defer m.workerLock.RUnlock()
worker, ok := m.workers[probeKey{podUID, containerName, probeType}]
return worker, ok
}

manager::removeWorker()

removeWorker()删除某worker。

1
2
3
4
5
6
//***从workers中删除某一worker***//
func (m *manager) removeWorker(podUID types.UID, containerName string, probeType probeType) {
m.workerLock.Lock()
defer m.workerLock.Unlock()
delete(m.workers, probeKey{podUID, containerName, probeType})
}

worker

worker定义在/pkg/kubelet/prober/worker.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type worker struct {
// Channel for stopping the probe.
stopCh chan struct{}
// The pod containing this probe (read-only)
pod *api.Pod
// The container to probe (read-only)
container api.Container
// Describes the probe configuration (read-only)
spec *api.Probe
// The type of the worker.
probeType probeType
// The probe value during the initial delay.
initialValue results.Result
// Where to store this workers results.
resultsManager results.Manager
probeManager *manager
// The last known container ID for this worker.
containerID kubecontainer.ContainerID
// The last probe result for this worker.
lastResult results.Result
// How many times in a row the probe has returned the same result.
resultRun int
// If set, skip probing.
onHold bool
}

worker可以执行检测操作。worker的主要字段如下:

  • probeType: 标识该worker所属的类型,livenessProber或readinessProber;
  • resultsManager: 检测结果存在地;
  • probeManager: 包含该worker的probemanager;

worker可以通过newWorker()生成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Creates and starts a new probe worker.
func newWorker(
m *manager,
probeType probeType,
pod *api.Pod,
container api.Container) *worker {
w := &worker{
stopCh: make(chan struct{}, 1), // Buffer so stop() can be non-blocking.
pod: pod,
container: container,
probeType: probeType,
probeManager: m,
}
switch probeType {
case readiness:
w.spec = container.ReadinessProbe
w.resultsManager = m.readinessManager
w.initialValue = results.Failure
case liveness:
w.spec = container.LivenessProbe
w.resultsManager = m.livenessManager
w.initialValue = results.Success
}
return w
}

可以看到,生成worker的时候,worker的resultsManager依据worker的类型为probeManager的readinessManager或livenessManager。

worker可以启动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//***启动worker***//
func (w *worker) run() {
probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second
probeTicker := time.NewTicker(probeTickerPeriod)
defer func() {
// Clean up.
probeTicker.Stop()
if !w.containerID.IsEmpty() {
w.resultsManager.Remove(w.containerID)
}
w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType)
}()
// If kubelet restarted the probes could be started in rapid succession.
// Let the worker wait for a random portion of tickerPeriod before probing.
time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))
probeLoop:
//***重复执行doProbe()***//
for w.doProbe() {
// Wait for next probe tick.
select {
case <-w.stopCh:
break probeLoop
//***定时器***//
case <-probeTicker.C:
// continue
}
}
}

run()会依据container的spec中设置的PeriodSeconds生成定时器,并依据定时器定时执行doProbe()。

doProbe()即执行检测的函数。容器会设置InitialDelaySeconds表示初始化需要的时间,如果在初始化时间内,那么livenessProber和readinessProber的worker都不会进行检测,而且检测结果的次数要超过阈值都会更新容器的状态,即把结果存储到resultsManager中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
//***执行检测***//
func (w *worker) doProbe() (keepGoing bool) {
defer func() { recover() }() // Actually eat panics (HandleCrash takes care of logging)
defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true })
//***检测podStatus是否存在,如果不存在,则返回true***//
status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID)
if !ok {
// Either the pod has not been created yet, or it was already deleted.
glog.V(3).Infof("No status for pod: %v", format.Pod(w.pod))
return true
}
// Worker should terminate if pod is terminated.
//***如果pod的satus.Phase状态为api.PodFailed或apiPodSucceeded,则返回false***//
if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded {
glog.V(3).Infof("Pod %v %v, exiting probe worker",
format.Pod(w.pod), status.Phase)
return false
}
c, ok := api.GetContainerStatus(status.ContainerStatuses, w.container.Name)
if !ok || len(c.ContainerID) == 0 {
// Either the container has not been created yet, or it was deleted.
glog.V(3).Infof("Probe target container not found: %v - %v",
format.Pod(w.pod), w.container.Name)
return true // Wait for more information.
}
//***新容器,重新启动对pod的检测***//
if w.containerID.String() != c.ContainerID {
if !w.containerID.IsEmpty() {
w.resultsManager.Remove(w.containerID)
}
w.containerID = kubecontainer.ParseContainerID(c.ContainerID)
w.resultsManager.Set(w.containerID, w.initialValue, w.pod)
// We've got a new container; resume probing.
w.onHold = false
}
//***如果被锁住,则返回true,继续检测等待新容器的生成***//
if w.onHold {
// Worker is on hold until there is a new container.
return true
}
//***容器不在运行状态***//
//***如果容器被标记为删除,或重启策略为api.RestartPolicyNever,则返回false***//
if c.State.Running == nil {
glog.V(3).Infof("Non-running container probed: %v - %v",
format.Pod(w.pod), w.container.Name)
if !w.containerID.IsEmpty() {
w.resultsManager.Set(w.containerID, results.Failure, w.pod)
}
// Abort if the container will not be restarted.
return c.State.Terminated == nil ||
w.pod.Spec.RestartPolicy != api.RestartPolicyNever
}
//***如果启动时间还未超出设定时间,则直接返回true***//
//***即pod在初始化阶段,不会进行liveness或readiness的检测***//
if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
return true
}
//***调用probe()***//
result, err := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID)
if err != nil {
// Prober error, throw away the result.
return true
}
if w.lastResult == result {
w.resultRun++
} else {
w.lastResult = result
w.resultRun = 1
}
//***未到阈值,返回true***//
if (result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) ||
(result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) {
// Success or failure is below threshold - leave the probe state unchanged.
return true
}
//***超过阈值,则更新pod ready状态***//
w.resultsManager.Set(w.containerID, result, w.pod)
if w.probeType == liveness && result == results.Failure {
// The container fails a liveness check, it will need to be restared.
// Stop probing until we see a new container ID. This is to reduce the
// chance of hitting #21751, where running `docker exec` when a
// container is being stopped may lead to corrupted container state.
w.onHold = true
}
return true
}

resultManager

resultManager定义在/pkg/kubelet/prober/results/results_manager.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type manager struct {
// guards the cache
sync.RWMutex
// map of container ID -> probe Result
cache map[kubecontainer.ContainerID]Result
// channel of updates
updates chan Update
}
var _ Manager = &manager{}
// NewManager creates ane returns an empty results manager.
func NewManager() Manager {
return &manager{
cache: make(map[kubecontainer.ContainerID]Result),
updates: make(chan Update, 20),
}
}

可以看到,resultManager中有一个cache用来缓存container的检测结果,还有updates用于存放更新。

resultManager可以通过Get()方法从cache中直接获取result:

1
2
3
4
5
6
func (m *manager) Get(id kubecontainer.ContainerID) (Result, bool) {
m.RLock()
defer m.RUnlock()
result, found := m.cache[id]
return result, found
}

通过Set()方法可以向resultManager提交更新:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (m *manager) Set(id kubecontainer.ContainerID, result Result, pod *api.Pod) {
if m.setInternal(id, result) {
m.updates <- Update{id, result, pod.UID}
}
}
// Internal helper for locked portion of set. Returns whether an update should be sent.
func (m *manager) setInternal(id kubecontainer.ContainerID, result Result) bool {
m.Lock()
defer m.Unlock()
prev, exists := m.cache[id]
if !exists || prev != result {
m.cache[id] = result
return true
}
return false
}

Set()除了会更新cache外,还把更新放入updates channel中。

可以通过Updates()方法获取updates channel:

1
2
3
func (m *manager) Updates() <-chan Update {
return m.updates
}

prober

prober可以执行具体的检测,定义在/pkg/kbuelet/prober/prober.go中:

1
2
3
4
5
6
7
8
9
10
// Prober helps to check the liveness/readiness of a container.
type prober struct {
exec execprobe.ExecProber
http httprobe.HTTPProber
tcp tcprobe.TCPProber
runner kubecontainer.ContainerCommandRunner
refManager *kubecontainer.RefManager
recorder record.EventRecorder
}

prober中定义了exec, http和tcp来表示三种检测方式。

可以通过newProber()来生成一个prober:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// NewProber creates a Prober, it takes a command runner and
// several container info managers.
func newProber(
runner kubecontainer.ContainerCommandRunner,
refManager *kubecontainer.RefManager,
recorder record.EventRecorder) *prober {
return &prober{
exec: execprobe.New(),
http: httprobe.New(),
tcp: tcprobe.New(),
runner: runner,
refManager: refManager,
recorder: recorder,
}
}

可以通过probe()方法进行检测:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
//***执行检测***//
func (pb *prober) probe(probeType probeType, pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (results.Result, error) {
var probeSpec *api.Probe
switch probeType {
case readiness:
probeSpec = container.ReadinessProbe
case liveness:
probeSpec = container.LivenessProbe
default:
return results.Failure, fmt.Errorf("Unknown probe type: %q", probeType)
}
ctrName := fmt.Sprintf("%s:%s", format.Pod(pod), container.Name)
if probeSpec == nil {
glog.Warningf("%s probe for %s is nil", probeType, ctrName)
return results.Success, nil
}
//***调用runProbeWithRetries()***//
result, output, err := pb.runProbeWithRetries(probeSpec, pod, status, container, containerID, maxProbeRetries)
if err != nil || result != probe.Success {
// Probe failed in one way or another.
ref, hasRef := pb.refManager.GetRef(containerID)
if !hasRef {
glog.Warningf("No ref for container %q (%s)", containerID.String(), ctrName)
}
if err != nil {
glog.V(1).Infof("%s probe for %q errored: %v", probeType, ctrName, err)
if hasRef {
pb.recorder.Eventf(ref, api.EventTypeWarning, events.ContainerUnhealthy, "%s probe errored: %v", probeType, err)
}
} else { // result != probe.Success
glog.V(1).Infof("%s probe for %q failed (%v): %s", probeType, ctrName, result, output)
if hasRef {
pb.recorder.Eventf(ref, api.EventTypeWarning, events.ContainerUnhealthy, "%s probe failed: %s", probeType, output)
}
}
return results.Failure, err
}
glog.V(3).Infof("%s probe for %q succeeded", probeType, ctrName)
return results.Success, nil
}

probe()主要调用了runProbeWithRetries():

1
2
3
4
5
6
7
8
9
10
11
12
13
//***对runProbe()的封装,可重复执行retries次***//
func (pb *prober) runProbeWithRetries(p *api.Probe, pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID, retries int) (probe.Result, string, error) {
var err error
var result probe.Result
var output string
for i := 0; i < retries; i++ {
result, output, err = pb.runProbe(p, pod, status, container, containerID)
if err == nil {
return result, output, nil
}
}
return result, output, err
}

runProbeWithRetries()可以多次调用runProbe()以确保检测执行成功,runProbe()定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//***按照Exec, HTTPGet, TCPSocket的优先级执行检测***//
func (pb *prober) runProbe(p *api.Probe, pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) {
timeout := time.Duration(p.TimeoutSeconds) * time.Second
if p.Exec != nil {
glog.V(4).Infof("Exec-Probe Pod: %v, Container: %v, Command: %v", pod, container, p.Exec.Command)
return pb.exec.Probe(pb.newExecInContainer(container, containerID, p.Exec.Command, timeout))
}
if p.HTTPGet != nil {
scheme := strings.ToLower(string(p.HTTPGet.Scheme))
host := p.HTTPGet.Host
if host == "" {
host = status.PodIP
}
port, err := extractPort(p.HTTPGet.Port, container)
if err != nil {
return probe.Unknown, "", err
}
path := p.HTTPGet.Path
glog.V(4).Infof("HTTP-Probe Host: %v://%v, Port: %v, Path: %v", scheme, host, port, path)
url := formatURL(scheme, host, port, path)
headers := buildHeader(p.HTTPGet.HTTPHeaders)
glog.V(4).Infof("HTTP-Probe Headers: %v", headers)
return pb.http.Probe(url, headers, timeout)
}
if p.TCPSocket != nil {
port, err := extractPort(p.TCPSocket.Port, container)
if err != nil {
return probe.Unknown, "", err
}
glog.V(4).Infof("TCP-Probe PodIP: %v, Port: %v, Timeout: %v", status.PodIP, port, timeout)
return pb.tcp.Probe(status.PodIP, port, timeout)
}
glog.Warningf("Failed to find probe builder for container: %v", container)
return probe.Unknown, "", fmt.Errorf("Missing probe handler for %s:%s", format.Pod(pod), container.Name)
}

runProbe()按Exec, HTTPGet, TCPSocket的优先级执行检测,并返回结果。

readinessManager

在proberManager中已经说过:

1
2
3
4
5
6
7
8
//***go routine中的updateReadiness()方法***//
func (m *manager) updateReadiness() {
//***获取readinessmanager的updates channel***//
update := <-m.readinessManager.Updates()
ready := update.Result == results.Success
m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
}

SetContainerReadiness()会设置container的ready及pod的ready。这里不再展开分析。

livenessManager

livenessManager是健康检测结果存储地。那么kubelet是如何感知到要重启容器的呢?
先来看/pkg/kbueelt/kubelet.go中:

1
2
3
4
5
6
klet.probeManager = prober.NewManager(
klet.statusManager,
klet.livenessManager,
klet.runner,
containerRefManager,
kubeDeps.Recorder)

可以看到kubelet在生成proberManager时把自己的livenessManager传给了proberManager,所以容器检测结果也就存储在了kubelet的livenessManager中。
然后在生成dockerManager时,也把livenessManager传进去。

1
2
3
4
5
6
7
8
case "docker":
runtime := dockertools.NewDockerManager(
kubeDeps.DockerClient,
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager,
containerRefManager,
klet.podManager
......

所以在/pkg/kubelet/dockertolls/docker_manager.go中的computePodContainerChanges()中有:

1
2
3
4
5
liveness, found := dm.livenessManager.Get(containerStatus.ID)
if !found || liveness == proberesults.Success {
containersToKeep[containerID] = index
continue
}

即如果不在livenessManager,或状态为Success的,则是需要维持不变的;反过来说,如果在livenessManager中,且状态为Failed的,则需要重启。