上次介绍了containerd的执行流程,其中容器相关的工作都是调用containerd中的container或process完成的。所以本次分析就介绍containerd的container和process,看这两者是如何和containerd-shim或runc打交道的。
container
container定义在/runtime/container.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| type container struct { root string id string bundle string runtime string runtimeArgs []string shim string processes map[string]*process labels []string oomFds []int noPivotRoot bool timeout time.Duration }
|
其中:
- root: 表示containerd的主目录,如/var/run/docker/libcontainerd/containerd;
- id: 表示container的id,如nginx;
- bundle: 容器rootfs目录;
- runtime: 一般为runc;
- runtimeArgs: runtime的一般性参数,runc为空;
- shim: containerd-shim二进制文件;
- processes: 记录容器内支持的进程,进程是指容器中运行的进程;
- labels: 容器的标签;
- noPivotRoot: 暂不知道用途,目前一直是false,runc中会有pivot系统调用,到分析runc时再回过头来看;
New()
New()生成一个container,并把state信息记录在state.json文件中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| func New(opts ContainerOpts) (Container, error) { c := &container{ root: opts.Root, id: opts.ID, bundle: opts.Bundle, labels: opts.Labels, processes: make(map[string]*process), runtime: opts.Runtime, runtimeArgs: opts.RuntimeArgs, shim: opts.Shim, noPivotRoot: opts.NoPivotRoot, timeout: opts.Timeout, } if err := os.Mkdir(filepath.Join(c.root, c.id), 0755); err != nil { return nil, err } f, err := os.Create(filepath.Join(c.root, c.id, StateFile)) if err != nil { return nil, err } defer f.Close() if err := json.NewEncoder(f).Encode(state{ Bundle: c.bundle, Labels: c.labels, Runtime: c.runtime, RuntimeArgs: c.runtimeArgs, Shim: c.shim, NoPivotRoot: opts.NoPivotRoot, }); err != nil { return nil, err } return c, nil }
|
Load()
Load()读取container的state.json及各进程的process.json,还原container对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| func Load(root, id, shimName string, timeout time.Duration) (Container, error) { var s state f, err := os.Open(filepath.Join(root, id, StateFile)) if err != nil { return nil, err } defer f.Close() if err := json.NewDecoder(f).Decode(&s); err != nil { return nil, err } c := &container{ root: root, id: id, bundle: s.Bundle, labels: s.Labels, runtime: s.Runtime, runtimeArgs: s.RuntimeArgs, shim: s.Shim, noPivotRoot: s.NoPivotRoot, processes: make(map[string]*process), timeout: timeout, } if c.shim == "" { c.shim = shimName } dirs, err := ioutil.ReadDir(filepath.Join(root, id)) if err != nil { return nil, err } for _, d := range dirs { if !d.IsDir() { continue } pid := d.Name() s, err := readProcessState(filepath.Join(root, id, pid)) if err != nil { return nil, err } p, err := loadProcess(filepath.Join(root, id, pid), pid, c, s) if err != nil { logrus.WithField("id", id).WithField("pid", pid).Debug("containerd: error loading process %s", err) continue } c.processes[pid] = p } return c, nil }
|
readSpec()
readSpec()读取bundle目录下的config.json文件。
1 2 3 4 5 6 7 8 9 10 11 12 13
| func (c *container) readSpec() (*specs.Spec, error) { var spec specs.Spec f, err := os.Open(filepath.Join(c.bundle, "config.json")) if err != nil { return nil, err } defer f.Close() if err := json.NewDecoder(f).Decode(&spec); err != nil { return nil, err } return &spec, nil }
|
Delete()
Delete()先移除containerd目录下的容器目录,然后调用runc delete id
删除容器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func (c *container) Delete() error { err := os.RemoveAll(filepath.Join(c.root, c.id)) args := c.runtimeArgs args = append(args, "delete", c.id) if b, derr := exec.Command(c.runtime, args...).CombinedOutput(); err != nil { err = fmt.Errorf("%s: %q", derr, string(b)) } else if len(b) > 0 { logrus.Debugf("%v %v: %q", c.runtime, args, string(b)) } return err }
|
Processes()
Processes()返回container中的processes。
1 2 3 4 5 6 7 8
| func (c *container) Processes() ([]Process, error) { out := []Process{} for _, p := range c.processes { out = append(out, p) } return out, nil }
|
RemoveProcesses()
RemoveProcesses()删除指定process的目录。在containerd中,一个process用一个目录表示。
1 2 3 4
| func (c *container) RemoveProcess(pid string) error { delete(c.processes, pid) return os.RemoveAll(filepath.Join(c.root, c.id, pid)) }
|
State()
State()返回init进程的state。
1 2 3 4 5 6 7 8
| func (c *container) State() State { proc := c.processes["init"] if proc == nil { return Stopped } return proc.State() }
|
Pause()
Pause()挂起某一容器。
1 2 3 4 5 6 7 8 9 10
| func (c *container) Pause() error { args := c.runtimeArgs args = append(args, "pause", c.id) b, err := exec.Command(c.runtime, args...).CombinedOutput() if err != nil { return fmt.Errorf("%s: %q", err.Error(), string(b)) } return nil }
|
Resume()
与Pause()相对应,Resume()恢复某一容器。
1 2 3 4 5 6 7 8 9 10
| func (c *container) Resume() error { args := c.runtimeArgs args = append(args, "resume", c.id) b, err := exec.Command(c.runtime, args...).CombinedOutput() if err != nil { return fmt.Errorf("%s: %q", err.Error(), string(b)) } return nil }
|
Start()
Start()的流程如下:
- 生成命令:
shim id bundle runc
,工作目录为process目录;
- 读取容器config.json文件,生成init process;
- 调用createCmd()启动
shim id bundle runc
;
- 返回init process。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| func (c *container) Start(checkpointPath string, s Stdio) (Process, error) { processRoot := filepath.Join(c.root, c.id, InitProcessID) if err := os.Mkdir(processRoot, 0755); err != nil { return nil, err } cmd := exec.Command(c.shim, c.id, c.bundle, c.runtime, ) cmd.Dir = processRoot cmd.SysProcAttr = &syscall.SysProcAttr{ Setpgid: true, } spec, err := c.readSpec() if err != nil { return nil, err } config := &processConfig{ checkpoint: checkpointPath, root: processRoot, id: InitProcessID, c: c, stdio: s, spec: spec, processSpec: specs.ProcessSpec(spec.Process), } p, err := newProcess(config) if err != nil { return nil, err } if err := c.createCmd(InitProcessID, cmd, p); err != nil { return nil, err } return p, nil }
|
Exec()
Exec()流程和Start()基本一致,只是生成的process不同。
由于传shim的工作目录为process目录,所以shim可以根据process.json判断出是Start()还是Exec(),并作出相应的处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| func (c *container) Exec(pid string, pspec specs.ProcessSpec, s Stdio) (pp Process, err error) { processRoot := filepath.Join(c.root, c.id, pid) if err := os.Mkdir(processRoot, 0755); err != nil { return nil, err } defer func() { if err != nil { c.RemoveProcess(pid) } }() cmd := exec.Command(c.shim, c.id, c.bundle, c.runtime, ) cmd.Dir = processRoot cmd.SysProcAttr = &syscall.SysProcAttr{ Setpgid: true, } spec, err := c.readSpec() if err != nil { return nil, err } config := &processConfig{ exec: true, id: pid, root: processRoot, c: c, processSpec: pspec, spec: spec, stdio: s, } p, err := newProcess(config) if err != nil { return nil, err } if err := c.createCmd(pid, cmd, p); err != nil { return nil, err } return p, nil }
|
createCmd()
createCmd()会执行命令,命令为shim命令,当具体容器内进程pid生成(由runc生成)后,createCmd会启动一个go routine来等待shim命令的结束。shim命令一般不会退出。当shim发生退出时,如果容器内的进程仍在运行,则需要把该进程杀死;如果容器内进程已经不存在,则无需清理工作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| func (c *container) createCmd(pid string, cmd *exec.Cmd, p *process) error { p.cmd = cmd if err := cmd.Start(); err != nil { close(p.cmdDoneCh) if exErr, ok := err.(*exec.Error); ok { if exErr.Err == exec.ErrNotFound || exErr.Err == os.ErrNotExist { return fmt.Errorf("%s not installed on system", c.shim) } } return err } defer func() { go func() { err := p.cmd.Wait() if err == nil { p.cmdSuccess = true } if same, err := p.isSameProcess(); same && p.pid > 0 { logrus.Infof("containerd: %s:%s (pid %v) has become an orphan, killing it", p.container.id, p.id, p.pid) err = unix.Kill(p.pid, syscall.SIGKILL) if err != nil && err != syscall.ESRCH { logrus.Errorf("containerd: unable to SIGKILL %s:%s (pid %v): %v", p.container.id, p.id, p.pid, err) } else { for { err = unix.Kill(p.pid, 0) if err != nil { break } time.Sleep(5 * time.Millisecond) } } } close(p.cmdDoneCh) }() }() if err := c.waitForCreate(p, cmd); err != nil { return err } c.processes[pid] = p return nil }
|
Pids()
Pids()返回容器中的进程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func (c *container) Pids() ([]int, error) { args := c.runtimeArgs args = append(args, "ps", "--format=json", c.id) out, err := exec.Command(c.runtime, args...).CombinedOutput() if err != nil { return nil, fmt.Errorf("%s: %q", err.Error(), out) } var pids []int if err := json.Unmarshal(out, &pids); err != nil { return nil, err } return pids, nil }
|
Stats()
Stats()通过调用runc events --stats nginx
获取容器的监控信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| func (c *container) Stats() (*Stat, error) { now := time.Now() args := c.runtimeArgs args = append(args, "events", "--stats", c.id) out, err := exec.Command(c.runtime, args...).CombinedOutput() if err != nil { return nil, fmt.Errorf("%s: %q", err.Error(), out) } s := struct { Data *Stat `json:"data"` }{} if err := json.Unmarshal(out, &s); err != nil { return nil, err } s.Data.Timestamp = now return s.Data, nil }
|
Status()
Status()通过runc state id
获取容器的状态信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| func (c *container) Status() (State, error) { args := c.runtimeArgs args = append(args, "state", c.id) out, err := exec.Command(c.runtime, args...).CombinedOutput() if err != nil { return "", fmt.Errorf("%s: %q", err.Error(), out) } var s struct { Status State `json:"status"` } if err := json.Unmarshal(out, &s); err != nil { return "", err } return s.Status, nil }
|
Process
Process定义在/runtime/process.go中,表示容器内部运行的一个进程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| type process struct { root string id string pid int exitPipe *os.File controlPipe *os.File container *container spec specs.ProcessSpec stdio Stdio cmd *exec.Cmd cmdSuccess bool cmdDoneCh chan struct{} state State stateLock sync.Mutex startTime string }
|
newProcess()
newProcess()的流程如下:
- 生成process;
- 创建process.json
- 生成ProcessState并写入process.json
- 创建exit和control以和shim交互;
- 返回process。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| func newProcess(config *processConfig) (*process, error) { p := &process{ root: config.root, id: config.id, container: config.c, spec: config.processSpec, stdio: config.stdio, cmdDoneCh: make(chan struct{}), state: Running, } uid, gid, err := getRootIDs(config.spec) if err != nil { return nil, err } f, err := os.Create(filepath.Join(config.root, "process.json")) if err != nil { return nil, err } defer f.Close() ps := ProcessState{ ProcessSpec: config.processSpec, Exec: config.exec, PlatformProcessState: PlatformProcessState{ Checkpoint: config.checkpoint, RootUID: uid, RootGID: gid, }, Stdin: config.stdio.Stdin, Stdout: config.stdio.Stdout, Stderr: config.stdio.Stderr, RuntimeArgs: config.c.runtimeArgs, NoPivotRoot: config.c.noPivotRoot, } if err := json.NewEncoder(f).Encode(ps); err != nil { return nil, err } exit, err := getExitPipe(filepath.Join(config.root, ExitFile)) if err != nil { return nil, err } control, err := getControlPipe(filepath.Join(config.root, ControlFile)) if err != nil { return nil, err } p.exitPipe = exit p.controlPipe = control return p, nil }
|
loadProcess()
loadProcess()读取process.json,并还原成process。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| func loadProcess(root, id string, c *container, s *ProcessState) (*process, error) { p := &process{ root: root, id: id, container: c, spec: s.ProcessSpec, stdio: Stdio{ Stdin: s.Stdin, Stdout: s.Stdout, Stderr: s.Stderr, }, state: Stopped, } startTime, err := ioutil.ReadFile(filepath.Join(p.root, StartTimeFile)) if err != nil && !os.IsNotExist(err) { return nil, err } p.startTime = string(startTime) if _, err := p.getPidFromFile(); err != nil { return nil, err } if _, err := p.ExitStatus(); err != nil { if err == ErrProcessNotExited { exit, err := getExitPipe(filepath.Join(root, ExitFile)) if err != nil { return nil, err } p.exitPipe = exit control, err := getControlPipe(filepath.Join(root, ControlFile)) if err != nil { return nil, err } p.controlPipe = control p.state = Running return p, nil } return nil, err } return p, nil }
|
readProcStatField()
readProcStatField()从/proc/pid/stat中读取指定信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| func readProcStatField(pid int, field int) (string, error) { data, err := ioutil.ReadFile(filepath.Join(string(filepath.Separator), "proc", strconv.Itoa(pid), "stat")) if err != nil { return "", err } if field > 2 { parts := strings.Split(string(data), ") ") parts = strings.Split(parts[1], " ") return parts[field-2-1], nil } parts := strings.Split(string(data), " (") if field == 1 { return parts[0], nil } parts = strings.Split(parts[1], ") ") return parts[0], nil }
|
readStartTime()
readStartTime()从系统中读取指定进程的启动时间。
1 2 3 4
| func (p *process) readStartTime() (string, error) { return readProcStatField(p.pid, 22) }
|
isSameProcess()
isSameProcess()从内存process中读取启动时间,再从系统中获取进程的启动时间(如果系统中的进程不存在,则启动时间为空),然后比较两个启动时间,如果一致,则说明系统中还有进程在运行。shim退出时可以用isSameProcess()来判断系统中是否进程残留。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| func (p *process) isSameProcess() (bool, error) { if p.startTime == "" { return true, nil } if p.pid == 0 { _, err := p.getPidFromFile() if err != nil { return false, err } } startTime, err := p.readStartTime() if err != nil { return false, err } return startTime == p.startTime, nil }
|
Signal()
Signal()可以向process发送信号。
1 2 3 4
| func (p *process) Signal(s os.Signal) error { return syscall.Kill(p.pid, s.(syscall.Signal)) }
|
Start()
Start()会调用runc start id
来启动一个容器。
container的Start()最终调用的是runc create
(通过shim调用)。而runc create
和runc start
两个命令都会完成容器的启动。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| func (p *process) Start() error { if p.ID() == InitProcessID { var ( errC = make(chan error, 1) args = append(p.container.runtimeArgs, "start", p.container.id) cmd = exec.Command(p.container.runtime, args...) ) go func() { out, err := cmd.CombinedOutput() if err != nil { errC <- fmt.Errorf("%s: %q", err.Error(), out) } errC <- nil }() select { case err := <-errC: if err != nil { return err } case <-p.cmdDoneCh: if !p.cmdSuccess { if cmd.Process != nil { cmd.Process.Kill() } cmd.Wait() return ErrShimExited } err := <-errC if err != nil { return err } } } return nil }
|
总结
Container对应的是容器,Process对应的是容器中的进程。Container的Start()和Exec()会调用containerd-shim,containerd-shim一般是个常驻进程,Container在containerd-shim退出时需要做清理工作。如果containerd-shim已经退出,但process还在执行,那么通过container会关闭cmdDoneCh以通知进程退出。