dockerManager负责在kubelet中与docker打交道。dockerManager会把pod的相关信息转换成具体的docker实现,并管理docker容器。在介绍dockerManager之前,先介绍dockerManager会用到的一些其他概念:containerGC, ExecHandler, imageStatsProvider及instrumentedDockerInterface,当然,这里介绍的概念的实现在kubelet中都是最底层的实现,都是基于docker runtime的。
先来看containerGC。
containerGC
containerGC的整体调用顺序为kubelet–container的containerGC–docker runtime的containerGC,这在以后分析。本次就介绍docker runtime的containerGC。
containerGC负责处理”dead”状态的容器的回收,当然,被回收的容器都是由kubelet创建的。
先来看下ContainerGCPolicy,定义在/pkg/kubelet/container/container_gc.go中:
1 2 3 4 5 6 7 8 9 10 11 12
| type ContainerGCPolicy struct { MinAge time.Duration MaxPerPodContainer int MaxContainers int }
|
containerGC会依据ContainerGCPolicy中的MinAge, MaxPerPodContainer, MaxContainers三个字段来进行容器的回收,字段含义如下:
- MinAge: 容器状态成为”dead”后,过MinAge时间后都会被回收,由kubelet的–minimum-container-ttl-duration参数指定;
- MaxPerPodContainer: 如果属于某个pod的”dead”状态容器数量超过此阈值,则消除超出的容器,由–kubelet的maximum-dead-containers-per-container参数指定;
- MaxContainers: 如果”dead”状态的容器数量大于MaxContainers,则回收超出的部分,由kubelet的–maximum-dead-containers参数指定。
注:kubelet的–minimum-container-ttl-duration, –kubelet的maximum-dead-containers-per-container, –maximum-dead-containers参数将由–eviction-soft及–eviction-hard代替。
docker runtime的containerGC定义在/pkg/kubelet/dockertools/container_gc.go中:
1 2 3 4 5
| type containerGC struct { client DockerInterface podGetter podGetter containerLogsDir string }
|
可以通过NewContainerGC()生成:
1 2 3 4 5 6 7
| func NewContainerGC(client DockerInterface, podGetter podGetter, containerLogsDir string) *containerGC { return &containerGC{ client: client, podGetter: podGetter, containerLogsDir: containerLogsDir, } }
|
docker runtime的containerGC实现有GarbageCollect()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| func (cgc *containerGC) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSourcesReady bool) error { evictUnits, unidentifiedContainers, err := cgc.evictableContainers(gcPolicy.MinAge) if err != nil { return err } for _, container := range unidentifiedContainers { glog.Infof("Removing unidentified dead container %q with ID %q", container.name, container.id) err = cgc.client.RemoveContainer(container.id, dockertypes.ContainerRemoveOptions{RemoveVolumes: true}) if err != nil { glog.Warningf("Failed to remove unidentified dead container %q: %v", container.name, err) } } if allSourcesReady { for key, unit := range evictUnits { if cgc.isPodDeleted(key.uid) { cgc.removeOldestN(unit, len(unit)) delete(evictUnits, key) } } } if gcPolicy.MaxPerPodContainer >= 0 { cgc.enforceMaxContainersPerEvictUnit(evictUnits, gcPolicy.MaxPerPodContainer) } if gcPolicy.MaxContainers >= 0 && evictUnits.NumContainers() > gcPolicy.MaxContainers { numContainersPerEvictUnit := gcPolicy.MaxContainers / evictUnits.NumEvictUnits() if numContainersPerEvictUnit < 1 { numContainersPerEvictUnit = 1 } cgc.enforceMaxContainersPerEvictUnit(evictUnits, numContainersPerEvictUnit) numContainers := evictUnits.NumContainers() if numContainers > gcPolicy.MaxContainers { flattened := make([]containerGCInfo, 0, numContainers) for uid := range evictUnits { flattened = append(flattened, evictUnits[uid]...) } sort.Sort(byCreated(flattened)) cgc.removeOldestN(flattened, numContainers-gcPolicy.MaxContainers) } } logSymlinks, _ := filepath.Glob(path.Join(cgc.containerLogsDir, fmt.Sprintf("*.%s", LogSuffix))) for _, logSymlink := range logSymlinks { if _, err = os.Stat(logSymlink); os.IsNotExist(err) { err = os.Remove(logSymlink) if err != nil { glog.Warningf("Failed to remove container log dead symlink %q: %v", logSymlink, err) } } } return nil }
|
GarbageCollect()流程如下:
- 通过evictableContainers()获取”dead”时间超过阈值的容器;
- 回收对应pod已经被删除的容器;
- 调用enforceMaxContainersPerEvictUnit()保证每个podcontainer对应的”dead”容器数量在阈值内;
- 如果总的”dead”容器数量超出阈值,则计算每个podcontainer能保留的数量,调用enforceMaxContainersPerEvictUnit()回收超出部分。
enforceMaxContainersPerEvictUnit()定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| func (cgc *containerGC) enforceMaxContainersPerEvictUnit(evictUnits containersByEvictUnit, MaxContainers int) { for uid := range evictUnits { toRemove := len(evictUnits[uid]) - MaxContainers if toRemove > 0 { evictUnits[uid] = cgc.removeOldestN(evictUnits[uid], toRemove) } } } func (cgc *containerGC) removeOldestN(containers []containerGCInfo, toRemove int) []containerGCInfo { numToKeep := len(containers) - toRemove for i := numToKeep; i < len(containers); i++ { cgc.removeContainer(containers[i].id, containers[i].podNameWithNamespace, containers[i].containerName) } return containers[:numToKeep] }
|
ExecHandler
ExecHandler可以在容器中执行一条命令。在docker runtime中,提供了nsenter和原生exec两种方式。
ExecHandler最开始在/pkg/kbuelet/kubelet.go中生成。
1 2 3 4 5 6 7 8 9 10
| var dockerExecHandler dockertools.ExecHandler switch kubeCfg.DockerExecHandlerName { case "native": dockerExecHandler = &dockertools.NativeExecHandler{} case "nsenter": dockerExecHandler = &dockertools.NsenterExecHandler{} default: glog.Warningf("Unknown Docker exec handler %q; defaulting to native", kubeCfg.DockerExecHandlerName) dockerExecHandler = &dockertools.NativeExecHandler{} }
|
ExecHandler定义在/pkg/kubelet/dockertools/exec.go中:
1 2 3 4
| type ExecHandler interface { ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error }
|
可见,实现了ExecInContainer()方法的结构体都可以称为ExecHandler()。
NsenterExecHandler
NsenterExecHandler使用nsenter命令在容器中执行命令,定义在/pkg/kubelet/dockertools/exec.go中:
1
| type NsenterExecHandler struct{}
|
ExecInContainer()方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| func (*NsenterExecHandler) ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error { nsenter, err := exec.LookPath("nsenter") if err != nil { return fmt.Errorf("exec unavailable - unable to locate nsenter") } containerPid := container.State.Pid args := []string{"-t", fmt.Sprintf("%d", containerPid), "-m", "-i", "-u", "-n", "-p", "--", "env", "-i"} args = append(args, fmt.Sprintf("HOSTNAME=%s", container.Config.Hostname)) args = append(args, container.Config.Env...) args = append(args, cmd...) command := exec.Command(nsenter, args...) if tty { p, err := kubecontainer.StartPty(command) if err != nil { return err } defer p.Close() defer stdout.Close() kubecontainer.HandleResizing(resize, func(size term.Size) { term.SetSize(p.Fd(), size) }) if stdin != nil { go io.Copy(p, stdin) } if stdout != nil { go io.Copy(stdout, p) } err = command.Wait() } else { if stdin != nil { r, w, err := os.Pipe() if err != nil { return err } go io.Copy(w, stdin) command.Stdin = r } if stdout != nil { command.Stdout = stdout } if stderr != nil { command.Stderr = stderr } err = command.Run() } if exitErr, ok := err.(*exec.ExitError); ok { return &utilexec.ExitErrorWrapper{ExitError: exitErr} } return err }
|
NativeExecHandler
NativeExecHandler调用Docker提供的API在容器中执行命令,定义在/pkg/kubelet/dockertools/exec.go中:
1
| type NativeExecHandler struct{}
|
ExecInContainer()定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| func (*NativeExecHandler) ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error { createOpts := dockertypes.ExecConfig{ Cmd: cmd, AttachStdin: stdin != nil, AttachStdout: stdout != nil, AttachStderr: stderr != nil, Tty: tty, } execObj, err := client.CreateExec(container.ID, createOpts) if err != nil { return fmt.Errorf("failed to exec in container - Exec setup failed - %v", err) } kubecontainer.HandleResizing(resize, func(size term.Size) { client.ResizeExecTTY(execObj.ID, int(size.Height), int(size.Width)) }) startOpts := dockertypes.ExecStartCheck{Detach: false, Tty: tty} streamOpts := StreamOptions{ InputStream: stdin, OutputStream: stdout, ErrorStream: stderr, RawTerminal: tty, } err = client.StartExec(execObj.ID, startOpts, streamOpts) if err != nil { return err } ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() count := 0 for { inspect, err2 := client.InspectExec(execObj.ID) if err2 != nil { return err2 } if !inspect.Running { if inspect.ExitCode != 0 { err = &dockerExitError{inspect} } break } count++ if count == 5 { glog.Errorf("Exec session %s in container %s terminated but process still running!", execObj.ID, container.ID) break } <-ticker.C } return err }
|
imageStatsProvider
imageStatsProvider可以查看镜像信息,定义在/pkg/kubelet/dockertools/images.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| type imageStatsProvider struct { sync.Mutex layers map[string]*dockertypes.ImageHistory imageToLayerIDs map[string][]string c DockerInterface } func newImageStatsProvider(c DockerInterface) *imageStatsProvider { return &imageStatsProvider{ layers: make(map[string]*dockertypes.ImageHistory), imageToLayerIDs: make(map[string][]string), c: c, } }
|
可以通过ImageStats()方法查看镜像信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| func (isp *imageStatsProvider) ImageStats() (*runtime.ImageStats, error) { images, err := isp.c.ListImages(dockertypes.ImageListOptions{}) if err != nil { return nil, fmt.Errorf("failed to list docker images - %v", err) } isp.Lock() defer isp.Unlock() newLayers := make(map[string]*dockertypes.ImageHistory) newImageToLayerIDs := make(map[string][]string) for _, image := range images { layerIDs, ok := isp.imageToLayerIDs[image.ID] if !ok { history, err := isp.c.ImageHistory(image.ID) if err != nil { glog.V(2).Infof("failed to get history of docker image %+v - %v", image, err) continue } for i := range history { layer := &history[i] key := layer.ID if key == "" || key == "<missing>" { key = key + layer.CreatedBy } layerIDs = append(layerIDs, key) newLayers[key] = layer } } else { for _, layerID := range layerIDs { newLayers[layerID] = isp.layers[layerID] } } newImageToLayerIDs[image.ID] = layerIDs } ret := &runtime.ImageStats{} for _, layer := range newLayers { ret.TotalStorageBytes += uint64(layer.Size) } isp.layers = newLayers isp.imageToLayerIDs = newImageToLayerIDs return ret, nil }
|
instrumentedDockerInterface
instrumentedDockerInterface是对kubeDockerClient的封装,提供了记录功能,定义在/pkg/kubelet/dockertools/instrumented_docker.go中:1 2 3 4 5 6 7 8 9 10 11
| type instrumentedDockerInterface struct { client DockerInterface } func NewInstrumentedDockerInterface(dockerClient DockerInterface) DockerInterface { return instrumentedDockerInterface{ client: dockerClient, } }
|
记录函数定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func recordOperation(operation string, start time.Time) { metrics.DockerOperations.WithLabelValues(operation).Inc() metrics.DockerOperationsLatency.WithLabelValues(operation).Observe(metrics.SinceInMicroseconds(start)) } func recordError(operation string, err error) { if err != nil { if _, ok := err.(operationTimeout); ok { metrics.DockerOperationsTimeout.WithLabelValues(operation).Inc() } metrics.DockerOperationsErrors.WithLabelValues(operation).Inc() } }
|
如在ListContainers()中:
1 2 3 4 5 6 7 8 9
| func (in instrumentedDockerInterface) ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error) { const operation = "list_containers" defer recordOperation(operation, time.Now()) out, err := in.client.ListContainers(options) recordError(operation, err) return out, err }
|
instrumentedDockerInterface具体的封装这里不做详细介绍,比较是DockerManager作为runtime供kubelet调用的。
再来看下instrumentedDockerInterface中的client是如何生成的。
该client可以由ConnectToDockerOrDie()生成,定义在/pkg/kbuelet/dockertools/docker.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| func ConnectToDockerOrDie(dockerEndpoint string, requestTimeout time.Duration) DockerInterface { if dockerEndpoint == "fake://" { return NewFakeDockerClient() } client, err := getDockerClient(dockerEndpoint) if err != nil { glog.Fatalf("Couldn't connect to docker: %v", err) } glog.Infof("Start docker client with request timeout=%v", requestTimeout) return newKubeDockerClient(client, requestTimeout) } func getDockerClient(dockerEndpoint string) (*dockerapi.Client, error) { if len(dockerEndpoint) > 0 { glog.Infof("Connecting to docker on %s", dockerEndpoint) return dockerapi.NewClient(dockerEndpoint, "", nil, nil) } return dockerapi.NewEnvClient() }
|
ConnectToDockerOrDie()调用了getDockerClient()生成一个client,再调用newKubeDockerClient()生成一个新的client。
newKubeDockerClient()定义在/kube_docker_client.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| func newKubeDockerClient(dockerClient *dockerapi.Client, requestTimeout time.Duration) DockerInterface { if requestTimeout == 0 { requestTimeout = defaultTimeout } k := &kubeDockerClient{ client: dockerClient, timeout: requestTimeout, } v, err := k.Version() if err != nil { glog.Errorf("failed to retrieve docker version: %v", err) glog.Warningf("Using empty version for docker client, this may sometimes cause compatibility issue.") } else { dockerClient.UpdateClientVersion(v.APIVersion) } return k }
|
newKubeDockerClient在原生client的基础上添加了超时机制。KubeDockerClient提供了对容器的操作方法。
所以,关于client,KubeDockerClient在Docker原生client基础上添加了超时机制;instrumentedDockerInterface又在KubeDockerClient的基础上添加了记录功能。