上次介绍了containerd的执行流程,其中容器相关的工作都是调用containerd中的container或process完成的。所以本次分析就介绍containerd的container和process,看这两者是如何和containerd-shim或runc打交道的。

container

container定义在/runtime/container.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type container struct {
// path to store runtime state information
root string
id string
bundle string
runtime string
runtimeArgs []string
shim string
processes map[string]*process
labels []string
oomFds []int
noPivotRoot bool
timeout time.Duration
}

其中:

  • root: 表示containerd的主目录,如/var/run/docker/libcontainerd/containerd;
  • id: 表示container的id,如nginx;
  • bundle: 容器rootfs目录;
  • runtime: 一般为runc;
  • runtimeArgs: runtime的一般性参数,runc为空;
  • shim: containerd-shim二进制文件;
  • processes: 记录容器内支持的进程,进程是指容器中运行的进程;
  • labels: 容器的标签;
  • noPivotRoot: 暂不知道用途,目前一直是false,runc中会有pivot系统调用,到分析runc时再回过头来看;

New()

New()生成一个container,并把state信息记录在state.json文件中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// New returns a new container
func New(opts ContainerOpts) (Container, error) {
c := &container{
root: opts.Root,
id: opts.ID,
bundle: opts.Bundle,
labels: opts.Labels,
processes: make(map[string]*process),
runtime: opts.Runtime,
runtimeArgs: opts.RuntimeArgs,
shim: opts.Shim,
noPivotRoot: opts.NoPivotRoot,
timeout: opts.Timeout,
}
if err := os.Mkdir(filepath.Join(c.root, c.id), 0755); err != nil {
return nil, err
}
//***创建state.json***//
//***StateFile = "state.json"***//
f, err := os.Create(filepath.Join(c.root, c.id, StateFile))
if err != nil {
return nil, err
}
defer f.Close()
//***写入state.json***//
if err := json.NewEncoder(f).Encode(state{
Bundle: c.bundle,
Labels: c.labels,
Runtime: c.runtime,
RuntimeArgs: c.runtimeArgs,
Shim: c.shim,
NoPivotRoot: opts.NoPivotRoot,
}); err != nil {
return nil, err
}
return c, nil
}

Load()

Load()读取container的state.json及各进程的process.json,还原container对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// Load return a new container from the matchin state file on disk.
func Load(root, id, shimName string, timeout time.Duration) (Container, error) {
var s state
//***StateFile = "state.json"***//
f, err := os.Open(filepath.Join(root, id, StateFile))
if err != nil {
return nil, err
}
defer f.Close()
if err := json.NewDecoder(f).Decode(&s); err != nil {
return nil, err
}
c := &container{
root: root,
id: id,
bundle: s.Bundle,
labels: s.Labels,
runtime: s.Runtime,
runtimeArgs: s.RuntimeArgs,
shim: s.Shim,
noPivotRoot: s.NoPivotRoot,
processes: make(map[string]*process),
timeout: timeout,
}
if c.shim == "" {
c.shim = shimName
}
dirs, err := ioutil.ReadDir(filepath.Join(root, id))
if err != nil {
return nil, err
}
//***一个目录代表一个进程***//
for _, d := range dirs {
if !d.IsDir() {
continue
}
pid := d.Name()
s, err := readProcessState(filepath.Join(root, id, pid))
if err != nil {
return nil, err
}
p, err := loadProcess(filepath.Join(root, id, pid), pid, c, s)
if err != nil {
logrus.WithField("id", id).WithField("pid", pid).Debug("containerd: error loading process %s", err)
continue
}
c.processes[pid] = p
}
return c, nil
}

readSpec()

readSpec()读取bundle目录下的config.json文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
//***读取bundle目录下的config.json文件***//
func (c *container) readSpec() (*specs.Spec, error) {
var spec specs.Spec
f, err := os.Open(filepath.Join(c.bundle, "config.json"))
if err != nil {
return nil, err
}
defer f.Close()
if err := json.NewDecoder(f).Decode(&spec); err != nil {
return nil, err
}
return &spec, nil
}

Delete()

Delete()先移除containerd目录下的容器目录,然后调用runc delete id删除容器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (c *container) Delete() error {
//***删除磁盘上state目录***//
err := os.RemoveAll(filepath.Join(c.root, c.id))
//***调用runc delete id命令删除容器***//
args := c.runtimeArgs
args = append(args, "delete", c.id)
if b, derr := exec.Command(c.runtime, args...).CombinedOutput(); err != nil {
err = fmt.Errorf("%s: %q", derr, string(b))
} else if len(b) > 0 {
logrus.Debugf("%v %v: %q", c.runtime, args, string(b))
}
return err
}

Processes()

Processes()返回container中的processes。

1
2
3
4
5
6
7
8
//***返回container中的processes***//
func (c *container) Processes() ([]Process, error) {
out := []Process{}
for _, p := range c.processes {
out = append(out, p)
}
return out, nil
}

RemoveProcesses()

RemoveProcesses()删除指定process的目录。在containerd中,一个process用一个目录表示。

1
2
3
4
func (c *container) RemoveProcess(pid string) error {
delete(c.processes, pid)
return os.RemoveAll(filepath.Join(c.root, c.id, pid))
}

State()

State()返回init进程的state。

1
2
3
4
5
6
7
8
//***返回init进程的state***//
func (c *container) State() State {
proc := c.processes["init"]
if proc == nil {
return Stopped
}
return proc.State()
}

Pause()

Pause()挂起某一容器。

1
2
3
4
5
6
7
8
9
10
//***调用runc pause id***//
func (c *container) Pause() error {
args := c.runtimeArgs
args = append(args, "pause", c.id)
b, err := exec.Command(c.runtime, args...).CombinedOutput()
if err != nil {
return fmt.Errorf("%s: %q", err.Error(), string(b))
}
return nil
}

Resume()

与Pause()相对应,Resume()恢复某一容器。

1
2
3
4
5
6
7
8
9
10
//***调用runc resume id***//
func (c *container) Resume() error {
args := c.runtimeArgs
args = append(args, "resume", c.id)
b, err := exec.Command(c.runtime, args...).CombinedOutput()
if err != nil {
return fmt.Errorf("%s: %q", err.Error(), string(b))
}
return nil
}

Start()

Start()的流程如下:

  1. 生成命令:shim id bundle runc,工作目录为process目录;
  2. 读取容器config.json文件,生成init process;
  3. 调用createCmd()启动shim id bundle runc
  4. 返回init process。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (c *container) Start(checkpointPath string, s Stdio) (Process, error) {
//***processRoot: /var/run/docker/libcontainerd/containerd/mynginx/init***//
processRoot := filepath.Join(c.root, c.id, InitProcessID)
if err := os.Mkdir(processRoot, 0755); err != nil {
return nil, err
}
//***构建cmd,调用的是containerd-shim***//
//***docker-containerd-shim nginx /home/fankang/mycontainer runc***//
cmd := exec.Command(c.shim,
c.id, c.bundle, c.runtime,
)
cmd.Dir = processRoot
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}
//***读取bundle目录下的config.json文件***//
spec, err := c.readSpec()
if err != nil {
return nil, err
}
//***InitProcessID = "init"***//
config := &processConfig{
checkpoint: checkpointPath,
root: processRoot,
id: InitProcessID,
c: c,
stdio: s,
spec: spec,
processSpec: specs.ProcessSpec(spec.Process),
}
//****生成process**//
p, err := newProcess(config)
if err != nil {
return nil, err
}
//***执行cmd***//
if err := c.createCmd(InitProcessID, cmd, p); err != nil {
return nil, err
}
return p, nil
}

Exec()

Exec()流程和Start()基本一致,只是生成的process不同。
由于传shim的工作目录为process目录,所以shim可以根据process.json判断出是Start()还是Exec(),并作出相应的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (c *container) Exec(pid string, pspec specs.ProcessSpec, s Stdio) (pp Process, err error) {
processRoot := filepath.Join(c.root, c.id, pid)
if err := os.Mkdir(processRoot, 0755); err != nil {
return nil, err
}
defer func() {
if err != nil {
c.RemoveProcess(pid)
}
}()
//***exec也是通过containerd-shim执行***//
cmd := exec.Command(c.shim,
c.id, c.bundle, c.runtime,
)
cmd.Dir = processRoot
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}
spec, err := c.readSpec()
if err != nil {
return nil, err
}
//***exec的config需标识exec: true***//
config := &processConfig{
exec: true,
id: pid,
root: processRoot,
c: c,
processSpec: pspec,
spec: spec,
stdio: s,
}
p, err := newProcess(config)
if err != nil {
return nil, err
}
if err := c.createCmd(pid, cmd, p); err != nil {
return nil, err
}
return p, nil
}

createCmd()

createCmd()会执行命令,命令为shim命令,当具体容器内进程pid生成(由runc生成)后,createCmd会启动一个go routine来等待shim命令的结束。shim命令一般不会退出。当shim发生退出时,如果容器内的进程仍在运行,则需要把该进程杀死;如果容器内进程已经不存在,则无需清理工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func (c *container) createCmd(pid string, cmd *exec.Cmd, p *process) error {
p.cmd = cmd
//***执行cmd***//
if err := cmd.Start(); err != nil {
close(p.cmdDoneCh)
if exErr, ok := err.(*exec.Error); ok {
if exErr.Err == exec.ErrNotFound || exErr.Err == os.ErrNotExist {
return fmt.Errorf("%s not installed on system", c.shim)
}
}
return err
}
// We need the pid file to have been written to run
//***defer中执行***//
defer func() {
//***起一个go routine等待shim结束***//
go func() {
//***Fankang***//
//***等待cmd执行完成***//
err := p.cmd.Wait()
if err == nil {
p.cmdSuccess = true
}
//***此处在调用ctr kill时都会执行到,表明shim进程退出时所要做的处理***//
//***系统中进程的启动时间和内存中记录的时间比较,查看是否为同一process***//
//***此处如果是正常退出的话,则linux系统上进程已经不存在,所以linux系统上进程时间为空***//
//***如果是异常退出的话,如kill -9 shim进程,则linux系统上进程仍存在,此时same为true***//
if same, err := p.isSameProcess(); same && p.pid > 0 {
// The process changed its PR_SET_PDEATHSIG, so force
// kill it
logrus.Infof("containerd: %s:%s (pid %v) has become an orphan, killing it", p.container.id, p.id, p.pid)
err = unix.Kill(p.pid, syscall.SIGKILL)
if err != nil && err != syscall.ESRCH {
logrus.Errorf("containerd: unable to SIGKILL %s:%s (pid %v): %v", p.container.id, p.id, p.pid, err)
} else {
for {
err = unix.Kill(p.pid, 0)
if err != nil {
break
}
time.Sleep(5 * time.Millisecond)
}
}
}
close(p.cmdDoneCh)
}()
}()
//***等待进行创建完成***//
if err := c.waitForCreate(p, cmd); err != nil {
return err
}
c.processes[pid] = p
return nil
}

Pids()

Pids()返回容器中的进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//***调用runc ps --format=json id获取容器中进程的pid号***//
func (c *container) Pids() ([]int, error) {
args := c.runtimeArgs
args = append(args, "ps", "--format=json", c.id)
out, err := exec.Command(c.runtime, args...).CombinedOutput()
if err != nil {
return nil, fmt.Errorf("%s: %q", err.Error(), out)
}
var pids []int
if err := json.Unmarshal(out, &pids); err != nil {
return nil, err
}
return pids, nil
}

Stats()

Stats()通过调用runc events --stats nginx获取容器的监控信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//***通过调用runc events --stats nginx获取容器的监控信息***//
func (c *container) Stats() (*Stat, error) {
now := time.Now()
args := c.runtimeArgs
args = append(args, "events", "--stats", c.id)
out, err := exec.Command(c.runtime, args...).CombinedOutput()
if err != nil {
return nil, fmt.Errorf("%s: %q", err.Error(), out)
}
s := struct {
Data *Stat `json:"data"`
}{}
if err := json.Unmarshal(out, &s); err != nil {
return nil, err
}
s.Data.Timestamp = now
return s.Data, nil
}

Status()

Status()通过runc state id获取容器的状态信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//***通过runc state id获取容器的状态信息***//
//{
// "ociVersion": "1.0.0-rc2-dev",
// "id": "nginx",
// "pid": 2416,
// "status": "running",
// "bundle": "/home/fankang/mycontainer",
// "rootfs": "/home/fankang/mycontainer/rootfs",
// "created": "2017-11-19T07:15:34.567151194Z"
//}
func (c *container) Status() (State, error) {
args := c.runtimeArgs
args = append(args, "state", c.id)
out, err := exec.Command(c.runtime, args...).CombinedOutput()
if err != nil {
return "", fmt.Errorf("%s: %q", err.Error(), out)
}
// We only require the runtime json output to have a top level Status field.
var s struct {
Status State `json:"status"`
}
if err := json.Unmarshal(out, &s); err != nil {
return "", err
}
return s.Status, nil
}

Process

Process定义在/runtime/process.go中,表示容器内部运行的一个进程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//***定义process***//
type process struct {
root string
id string
pid int
exitPipe *os.File
controlPipe *os.File
container *container
spec specs.ProcessSpec
stdio Stdio
cmd *exec.Cmd
cmdSuccess bool
cmdDoneCh chan struct{}
state State
stateLock sync.Mutex
startTime string
}

newProcess()

newProcess()的流程如下:

  1. 生成process;
  2. 创建process.json
  3. 生成ProcessState并写入process.json
  4. 创建exit和control以和shim交互;
  5. 返回process。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func newProcess(config *processConfig) (*process, error) {
//***p: &{/var/run/docker/libcontainerd/containerd/nginx/init init 0 <nil> <nil> 0xc4200a71e0 {true {0 0 []} [/usr/bin/supervisord] [PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin TERM=xterm] / [CAP_AUDIT_WRITE CAP_KILL CAP_NET_BIND_SERVICE] [{RLIMIT_NOFILE 1024 1024}] true } {/tmp/ctr-884111030/stdin /tmp/ctr-884111030/stdout /tmp/ctr-884111030/stderr} <nil> false 0xc420057620 running {0 0} }***//
p := &process{
root: config.root,
id: config.id,
container: config.c,
spec: config.processSpec,
stdio: config.stdio,
cmdDoneCh: make(chan struct{}),
state: Running,
}
uid, gid, err := getRootIDs(config.spec)
if err != nil {
return nil, err
}
//***创建process.json***//
//***config.root: /var/run/docker/libcontainerd/containerd/nginx/init***//
f, err := os.Create(filepath.Join(config.root, "process.json"))
if err != nil {
return nil, err
}
defer f.Close()
ps := ProcessState{
ProcessSpec: config.processSpec,
Exec: config.exec,
PlatformProcessState: PlatformProcessState{
Checkpoint: config.checkpoint,
RootUID: uid,
RootGID: gid,
},
Stdin: config.stdio.Stdin,
Stdout: config.stdio.Stdout,
Stderr: config.stdio.Stderr,
RuntimeArgs: config.c.runtimeArgs,
NoPivotRoot: config.c.noPivotRoot,
}
//***写入process.json***//
if err := json.NewEncoder(f).Encode(ps); err != nil {
return nil, err
}
//***ExitFile = "exit"***//
exit, err := getExitPipe(filepath.Join(config.root, ExitFile))
if err != nil {
return nil, err
}
//***ControlFile = "control"***//
control, err := getControlPipe(filepath.Join(config.root, ControlFile))
if err != nil {
return nil, err
}
p.exitPipe = exit
p.controlPipe = control
return p, nil
}

loadProcess()

loadProcess()读取process.json,并还原成process。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//***从process.json中还原process***//
func loadProcess(root, id string, c *container, s *ProcessState) (*process, error) {
p := &process{
root: root,
id: id,
container: c,
spec: s.ProcessSpec,
stdio: Stdio{
Stdin: s.Stdin,
Stdout: s.Stdout,
Stderr: s.Stderr,
},
state: Stopped,
}
startTime, err := ioutil.ReadFile(filepath.Join(p.root, StartTimeFile))
if err != nil && !os.IsNotExist(err) {
return nil, err
}
p.startTime = string(startTime)
if _, err := p.getPidFromFile(); err != nil {
return nil, err
}
if _, err := p.ExitStatus(); err != nil {
if err == ErrProcessNotExited {
exit, err := getExitPipe(filepath.Join(root, ExitFile))
if err != nil {
return nil, err
}
p.exitPipe = exit
control, err := getControlPipe(filepath.Join(root, ControlFile))
if err != nil {
return nil, err
}
p.controlPipe = control
p.state = Running
return p, nil
}
return nil, err
}
return p, nil
}

readProcStatField()

readProcStatField()从/proc/pid/stat中读取指定信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//***从/proc/9947/stat中提取指定字段***//
func readProcStatField(pid int, field int) (string, error) {
//***/proc/9947/stat***//
data, err := ioutil.ReadFile(filepath.Join(string(filepath.Separator), "proc", strconv.Itoa(pid), "stat"))
if err != nil {
return "", err
}
if field > 2 {
// First, split out the name since he could contains spaces.
parts := strings.Split(string(data), ") ")
// Now split out the rest, we end up with 2 fields less
parts = strings.Split(parts[1], " ")
return parts[field-2-1], nil // field count start at 1 in manual
}
parts := strings.Split(string(data), " (")
if field == 1 {
return parts[0], nil
}
parts = strings.Split(parts[1], ") ")
return parts[0], nil
}

readStartTime()

readStartTime()从系统中读取指定进程的启动时间。

1
2
3
4
//***从/proc/9947/stat中读取启动时间***//
func (p *process) readStartTime() (string, error) {
return readProcStatField(p.pid, 22)
}

isSameProcess()

isSameProcess()从内存process中读取启动时间,再从系统中获取进程的启动时间(如果系统中的进程不存在,则启动时间为空),然后比较两个启动时间,如果一致,则说明系统中还有进程在运行。shim退出时可以用isSameProcess()来判断系统中是否进程残留。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//***比较启动时间查看process是否为同一个***//
func (p *process) isSameProcess() (bool, error) {
// for backward compat assume it's the same if startTime wasn't set
if p.startTime == "" {
return true, nil
}
if p.pid == 0 {
_, err := p.getPidFromFile()
if err != nil {
return false, err
}
}
startTime, err := p.readStartTime()
if err != nil {
return false, err
}
return startTime == p.startTime, nil
}

Signal()

Signal()可以向process发送信号。

1
2
3
4
//***向process发送信号***//
func (p *process) Signal(s os.Signal) error {
return syscall.Kill(p.pid, s.(syscall.Signal))
}

Start()

Start()会调用runc start id来启动一个容器。
container的Start()最终调用的是runc create(通过shim调用)。而runc createrunc start两个命令都会完成容器的启动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (p *process) Start() error {
if p.ID() == InitProcessID {
var (
errC = make(chan error, 1)
args = append(p.container.runtimeArgs, "start", p.container.id)
cmd = exec.Command(p.container.runtime, args...)
)
go func() {
out, err := cmd.CombinedOutput()
if err != nil {
errC <- fmt.Errorf("%s: %q", err.Error(), out)
}
//***runc start执行成功,向errC发送nil***//
errC <- nil
}()
//***如果errC或cmdDoneCh***//
select {
case err := <-errC:
if err != nil {
return err
}
//***如果cmdDoneCh被Close(),则此处可以捕获到***//
case <-p.cmdDoneCh:
if !p.cmdSuccess {
if cmd.Process != nil {
cmd.Process.Kill()
}
cmd.Wait()
return ErrShimExited
}
err := <-errC
if err != nil {
return err
}
}
}
return nil
}

总结

Container对应的是容器,Process对应的是容器中的进程。Container的Start()和Exec()会调用containerd-shim,containerd-shim一般是个常驻进程,Container在containerd-shim退出时需要做清理工作。如果containerd-shim已经退出,但process还在执行,那么通过container会关闭cmdDoneCh以通知进程退出。