dockerManager负责在kubelet中与docker打交道。dockerManager会把pod的相关信息转换成具体的docker实现,并管理docker容器。在介绍dockerManager之前,先介绍dockerManager会用到的一些其他概念:containerGC, ExecHandler, imageStatsProvider及instrumentedDockerInterface,当然,这里介绍的概念的实现在kubelet中都是最底层的实现,都是基于docker runtime的。

先来看containerGC。

containerGC

containerGC的整体调用顺序为kubelet–container的containerGC–docker runtime的containerGC,这在以后分析。本次就介绍docker runtime的containerGC。
containerGC负责处理”dead”状态的容器的回收,当然,被回收的容器都是由kubelet创建的。
先来看下ContainerGCPolicy,定义在/pkg/kubelet/container/container_gc.go中:

1
2
3
4
5
6
7
8
9
10
11
12
// Specified a policy for garbage collecting containers.
type ContainerGCPolicy struct {
// Minimum age at which a container can be garbage collected, zero for no limit.
MinAge time.Duration
// Max number of dead containers any single pod (UID, container name) pair is
// allowed to have, less than zero for no limit.
MaxPerPodContainer int
// Max number of total dead containers, less than zero for no limit.
MaxContainers int
}

containerGC会依据ContainerGCPolicy中的MinAge, MaxPerPodContainer, MaxContainers三个字段来进行容器的回收,字段含义如下:

  • MinAge: 容器状态成为”dead”后,过MinAge时间后都会被回收,由kubelet的–minimum-container-ttl-duration参数指定;
  • MaxPerPodContainer: 如果属于某个pod的”dead”状态容器数量超过此阈值,则消除超出的容器,由–kubelet的maximum-dead-containers-per-container参数指定;
  • MaxContainers: 如果”dead”状态的容器数量大于MaxContainers,则回收超出的部分,由kubelet的–maximum-dead-containers参数指定。

注:kubelet的–minimum-container-ttl-duration, –kubelet的maximum-dead-containers-per-container, –maximum-dead-containers参数将由–eviction-soft及–eviction-hard代替。

docker runtime的containerGC定义在/pkg/kubelet/dockertools/container_gc.go中:

1
2
3
4
5
type containerGC struct {
client DockerInterface
podGetter podGetter
containerLogsDir string
}

可以通过NewContainerGC()生成:

1
2
3
4
5
6
7
func NewContainerGC(client DockerInterface, podGetter podGetter, containerLogsDir string) *containerGC {
return &containerGC{
client: client,
podGetter: podGetter,
containerLogsDir: containerLogsDir,
}
}

docker runtime的containerGC实现有GarbageCollect()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
//***回收容器***//
func (cgc *containerGC) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSourcesReady bool) error {
// Separate containers by evict units.
//***获取需要回收的容器***//
evictUnits, unidentifiedContainers, err := cgc.evictableContainers(gcPolicy.MinAge)
if err != nil {
return err
}
// Remove unidentified containers.
//***回收名称解析失败的"dead"容器***//
for _, container := range unidentifiedContainers {
glog.Infof("Removing unidentified dead container %q with ID %q", container.name, container.id)
err = cgc.client.RemoveContainer(container.id, dockertypes.ContainerRemoveOptions{RemoveVolumes: true})
if err != nil {
glog.Warningf("Failed to remove unidentified dead container %q: %v", container.name, err)
}
}
// Remove deleted pod containers if all sources are ready.
//***回收删除的容器***//
if allSourcesReady {
for key, unit := range evictUnits {
if cgc.isPodDeleted(key.uid) {
cgc.removeOldestN(unit, len(unit)) // Remove all.
delete(evictUnits, key)
}
}
}
// Enforce max containers per evict unit.
//***保证每个pod所属的"dead"容器数量在阈值内***//
if gcPolicy.MaxPerPodContainer >= 0 {
cgc.enforceMaxContainersPerEvictUnit(evictUnits, gcPolicy.MaxPerPodContainer)
}
// Enforce max total number of containers.
//***如果"daed"容器数量超过总的阈值,则取平均值计算每个evictUnits中可以保留的容器数量,清除超出部分***//
if gcPolicy.MaxContainers >= 0 && evictUnits.NumContainers() > gcPolicy.MaxContainers {
// Leave an equal number of containers per evict unit (min: 1).
numContainersPerEvictUnit := gcPolicy.MaxContainers / evictUnits.NumEvictUnits()
if numContainersPerEvictUnit < 1 {
numContainersPerEvictUnit = 1
}
cgc.enforceMaxContainersPerEvictUnit(evictUnits, numContainersPerEvictUnit)
// If we still need to evict, evict oldest first.
numContainers := evictUnits.NumContainers()
if numContainers > gcPolicy.MaxContainers {
flattened := make([]containerGCInfo, 0, numContainers)
for uid := range evictUnits {
flattened = append(flattened, evictUnits[uid]...)
}
sort.Sort(byCreated(flattened))
cgc.removeOldestN(flattened, numContainers-gcPolicy.MaxContainers)
}
}
// Remove dead symlinks - should only happen on upgrade
// from a k8s version without proper log symlink cleanup
logSymlinks, _ := filepath.Glob(path.Join(cgc.containerLogsDir, fmt.Sprintf("*.%s", LogSuffix)))
for _, logSymlink := range logSymlinks {
if _, err = os.Stat(logSymlink); os.IsNotExist(err) {
err = os.Remove(logSymlink)
if err != nil {
glog.Warningf("Failed to remove container log dead symlink %q: %v", logSymlink, err)
}
}
}
return nil
}

GarbageCollect()流程如下:

  1. 通过evictableContainers()获取”dead”时间超过阈值的容器;
  2. 回收对应pod已经被删除的容器;
  3. 调用enforceMaxContainersPerEvictUnit()保证每个podcontainer对应的”dead”容器数量在阈值内;
  4. 如果总的”dead”容器数量超出阈值,则计算每个podcontainer能保留的数量,调用enforceMaxContainersPerEvictUnit()回收超出部分。

enforceMaxContainersPerEvictUnit()定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//***如果podcontainer对应的"dead"状态的容器超过阈值,则回收超过阈值的容器***//
func (cgc *containerGC) enforceMaxContainersPerEvictUnit(evictUnits containersByEvictUnit, MaxContainers int) {
for uid := range evictUnits {
toRemove := len(evictUnits[uid]) - MaxContainers
if toRemove > 0 {
evictUnits[uid] = cgc.removeOldestN(evictUnits[uid], toRemove)
}
}
}
// Removes the oldest toRemove containers and returns the resulting slice.
func (cgc *containerGC) removeOldestN(containers []containerGCInfo, toRemove int) []containerGCInfo {
// Remove from oldest to newest (last to first).
numToKeep := len(containers) - toRemove
for i := numToKeep; i < len(containers); i++ {
cgc.removeContainer(containers[i].id, containers[i].podNameWithNamespace, containers[i].containerName)
}
// Assume we removed the containers so that we're not too aggressive.
return containers[:numToKeep]
}

ExecHandler

ExecHandler可以在容器中执行一条命令。在docker runtime中,提供了nsenter和原生exec两种方式。
ExecHandler最开始在/pkg/kbuelet/kubelet.go中生成。

1
2
3
4
5
6
7
8
9
10
var dockerExecHandler dockertools.ExecHandler
switch kubeCfg.DockerExecHandlerName {
case "native":
dockerExecHandler = &dockertools.NativeExecHandler{}
case "nsenter":
dockerExecHandler = &dockertools.NsenterExecHandler{}
default:
glog.Warningf("Unknown Docker exec handler %q; defaulting to native", kubeCfg.DockerExecHandlerName)
dockerExecHandler = &dockertools.NativeExecHandler{}
}

ExecHandler定义在/pkg/kubelet/dockertools/exec.go中:

1
2
3
4
// ExecHandler knows how to execute a command in a running Docker container.
type ExecHandler interface {
ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error
}

可见,实现了ExecInContainer()方法的结构体都可以称为ExecHandler()。

NsenterExecHandler

NsenterExecHandler使用nsenter命令在容器中执行命令,定义在/pkg/kubelet/dockertools/exec.go中:

1
type NsenterExecHandler struct{}

ExecInContainer()方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
//***在容器中执行命令***//
func (*NsenterExecHandler) ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error {
//***查找nsenter***//
nsenter, err := exec.LookPath("nsenter")
if err != nil {
return fmt.Errorf("exec unavailable - unable to locate nsenter")
}
//***获取容器的pid***//
containerPid := container.State.Pid
// TODO what if the container doesn't have `env`???
args := []string{"-t", fmt.Sprintf("%d", containerPid), "-m", "-i", "-u", "-n", "-p", "--", "env", "-i"}
args = append(args, fmt.Sprintf("HOSTNAME=%s", container.Config.Hostname))
args = append(args, container.Config.Env...)
args = append(args, cmd...)
command := exec.Command(nsenter, args...)
if tty {
p, err := kubecontainer.StartPty(command)
if err != nil {
return err
}
defer p.Close()
// make sure to close the stdout stream
defer stdout.Close()
kubecontainer.HandleResizing(resize, func(size term.Size) {
term.SetSize(p.Fd(), size)
})
if stdin != nil {
go io.Copy(p, stdin)
}
if stdout != nil {
go io.Copy(stdout, p)
}
err = command.Wait()
} else {
if stdin != nil {
// Use an os.Pipe here as it returns true *os.File objects.
// This way, if you run 'kubectl exec <pod> -i bash' (no tty) and type 'exit',
// the call below to command.Run() can unblock because its Stdin is the read half
// of the pipe.
r, w, err := os.Pipe()
if err != nil {
return err
}
go io.Copy(w, stdin)
command.Stdin = r
}
if stdout != nil {
command.Stdout = stdout
}
if stderr != nil {
command.Stderr = stderr
}
err = command.Run()
}
if exitErr, ok := err.(*exec.ExitError); ok {
return &utilexec.ExitErrorWrapper{ExitError: exitErr}
}
return err
}

NativeExecHandler

NativeExecHandler调用Docker提供的API在容器中执行命令,定义在/pkg/kubelet/dockertools/exec.go中:

1
type NativeExecHandler struct{}

ExecInContainer()定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func (*NativeExecHandler) ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error {
createOpts := dockertypes.ExecConfig{
Cmd: cmd,
AttachStdin: stdin != nil,
AttachStdout: stdout != nil,
AttachStderr: stderr != nil,
Tty: tty,
}
//***创建exec***//
execObj, err := client.CreateExec(container.ID, createOpts)
if err != nil {
return fmt.Errorf("failed to exec in container - Exec setup failed - %v", err)
}
// Have to start this before the call to client.StartExec because client.StartExec is a blocking
// call :-( Otherwise, resize events don't get processed and the terminal never resizes.
kubecontainer.HandleResizing(resize, func(size term.Size) {
client.ResizeExecTTY(execObj.ID, int(size.Height), int(size.Width))
})
startOpts := dockertypes.ExecStartCheck{Detach: false, Tty: tty}
streamOpts := StreamOptions{
InputStream: stdin,
OutputStream: stdout,
ErrorStream: stderr,
RawTerminal: tty,
}
//***启动exec***//
err = client.StartExec(execObj.ID, startOpts, streamOpts)
if err != nil {
return err
}
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
count := 0
for {
inspect, err2 := client.InspectExec(execObj.ID)
if err2 != nil {
return err2
}
//***inspect的状态不为Running且ExitCode为0,则正常退出***//
if !inspect.Running {
if inspect.ExitCode != 0 {
err = &dockerExitError{inspect}
}
break
}
count++
if count == 5 {
glog.Errorf("Exec session %s in container %s terminated but process still running!", execObj.ID, container.ID)
break
}
<-ticker.C
}
return err
}

imageStatsProvider

imageStatsProvider可以查看镜像信息,定义在/pkg/kubelet/dockertools/images.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// imageStatsProvider exposes stats about all images currently available.
type imageStatsProvider struct {
sync.Mutex
// layers caches the current layers, key is the layer ID.
layers map[string]*dockertypes.ImageHistory
// imageToLayerIDs maps image to its layer IDs.
imageToLayerIDs map[string][]string
// Docker remote API client
c DockerInterface
}
func newImageStatsProvider(c DockerInterface) *imageStatsProvider {
return &imageStatsProvider{
layers: make(map[string]*dockertypes.ImageHistory),
imageToLayerIDs: make(map[string][]string),
c: c,
}
}

可以通过ImageStats()方法查看镜像信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
//***查看镜像信息***//
func (isp *imageStatsProvider) ImageStats() (*runtime.ImageStats, error) {
images, err := isp.c.ListImages(dockertypes.ImageListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to list docker images - %v", err)
}
// Take the lock to protect the cache
isp.Lock()
defer isp.Unlock()
// Create new cache each time, this is a little more memory consuming, but:
// * ImageStats is only called every 10 seconds
// * We use pointers and reference to copy cache elements.
// The memory usage should be acceptable.
// TODO(random-liu): Add more logic to implement in place cache update.
newLayers := make(map[string]*dockertypes.ImageHistory)
newImageToLayerIDs := make(map[string][]string)
for _, image := range images {
layerIDs, ok := isp.imageToLayerIDs[image.ID]
if !ok {
// Get information about the various layers of the given docker image.
history, err := isp.c.ImageHistory(image.ID)
if err != nil {
// Skip the image and inspect again in next ImageStats if the image is still there
glog.V(2).Infof("failed to get history of docker image %+v - %v", image, err)
continue
}
// Cache each layer
for i := range history {
layer := &history[i]
key := layer.ID
// Some of the layers are empty.
// We are hoping that these layers are unique to each image.
// Still keying with the CreatedBy field to be safe.
if key == "" || key == "<missing>" {
key = key + layer.CreatedBy
}
layerIDs = append(layerIDs, key)
newLayers[key] = layer
}
} else {
for _, layerID := range layerIDs {
newLayers[layerID] = isp.layers[layerID]
}
}
newImageToLayerIDs[image.ID] = layerIDs
}
ret := &runtime.ImageStats{}
// Calculate the total storage bytes
for _, layer := range newLayers {
ret.TotalStorageBytes += uint64(layer.Size)
}
// Update current cache
isp.layers = newLayers
isp.imageToLayerIDs = newImageToLayerIDs
return ret, nil
}

instrumentedDockerInterface

instrumentedDockerInterface是对kubeDockerClient的封装,提供了记录功能,定义在/pkg/kubelet/dockertools/instrumented_docker.go中:

1
2
3
4
5
6
7
8
9
10
11
type instrumentedDockerInterface struct {
client DockerInterface
}
// Creates an instrumented DockerInterface from an existing DockerInterface.
//***生成InstrumentedDockerInterface***//
func NewInstrumentedDockerInterface(dockerClient DockerInterface) DockerInterface {
return instrumentedDockerInterface{
client: dockerClient,
}
}

记录函数定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// recordOperation records the duration of the operation.
func recordOperation(operation string, start time.Time) {
metrics.DockerOperations.WithLabelValues(operation).Inc()
metrics.DockerOperationsLatency.WithLabelValues(operation).Observe(metrics.SinceInMicroseconds(start))
}
// recordError records error for metric if an error occurred.
func recordError(operation string, err error) {
if err != nil {
if _, ok := err.(operationTimeout); ok {
metrics.DockerOperationsTimeout.WithLabelValues(operation).Inc()
}
// Docker operation timeout error is also a docker error, so we don't add else here.
metrics.DockerOperationsErrors.WithLabelValues(operation).Inc()
}
}

如在ListContainers()中:

1
2
3
4
5
6
7
8
9
//***列出containers***//
func (in instrumentedDockerInterface) ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error) {
const operation = "list_containers"
defer recordOperation(operation, time.Now())
out, err := in.client.ListContainers(options)
recordError(operation, err)
return out, err
}

instrumentedDockerInterface具体的封装这里不做详细介绍,比较是DockerManager作为runtime供kubelet调用的。

再来看下instrumentedDockerInterface中的client是如何生成的。

该client可以由ConnectToDockerOrDie()生成,定义在/pkg/kbuelet/dockertools/docker.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func ConnectToDockerOrDie(dockerEndpoint string, requestTimeout time.Duration) DockerInterface {
if dockerEndpoint == "fake://" {
return NewFakeDockerClient()
}
client, err := getDockerClient(dockerEndpoint)
if err != nil {
glog.Fatalf("Couldn't connect to docker: %v", err)
}
glog.Infof("Start docker client with request timeout=%v", requestTimeout)
return newKubeDockerClient(client, requestTimeout)
}
//***如果有dockerEndpoint,则通过dockerEndpoint生成docker client; 否则通过环境变量生成docker client***//
func getDockerClient(dockerEndpoint string) (*dockerapi.Client, error) {
if len(dockerEndpoint) > 0 {
glog.Infof("Connecting to docker on %s", dockerEndpoint)
return dockerapi.NewClient(dockerEndpoint, "", nil, nil)
}
return dockerapi.NewEnvClient()
}

ConnectToDockerOrDie()调用了getDockerClient()生成一个client,再调用newKubeDockerClient()生成一个新的client。
newKubeDockerClient()定义在/kube_docker_client.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// newKubeDockerClient creates an kubeDockerClient from an existing docker client. If requestTimeout is 0,
// defaultTimeout will be applied.
func newKubeDockerClient(dockerClient *dockerapi.Client, requestTimeout time.Duration) DockerInterface {
if requestTimeout == 0 {
requestTimeout = defaultTimeout
}
k := &kubeDockerClient{
client: dockerClient,
timeout: requestTimeout,
}
// Notice that this assumes that docker is running before kubelet is started.
v, err := k.Version()
if err != nil {
glog.Errorf("failed to retrieve docker version: %v", err)
glog.Warningf("Using empty version for docker client, this may sometimes cause compatibility issue.")
} else {
// Update client version with real api version.
dockerClient.UpdateClientVersion(v.APIVersion)
}
return k
}

newKubeDockerClient在原生client的基础上添加了超时机制。KubeDockerClient提供了对容器的操作方法。

所以,关于client,KubeDockerClient在Docker原生client基础上添加了超时机制;instrumentedDockerInterface又在KubeDockerClient的基础上添加了记录功能。