containerd-shim的代码位于containerd的/containerd-shim目录下,独立编译成二进制。containerd-shim是一个常驻进程,负责容器中进程的启动,是容器进程的父进程。有了containerd-shim之后,容器进程与containerd有了父子联系。

main

先来看下containerd-shim的main()函数,定义在/containerd-shim/main.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func main() {
flag.Parse()
//***cwd: /run/docker/libcontainerd/containerd/nginx/init***//
cwd, err := os.Getwd()
if err != nil {
panic(err)
}
f, err := os.OpenFile(filepath.Join(cwd, "shim-log.json"), os.O_CREATE|os.O_WRONLY|os.O_APPEND|os.O_SYNC, 0666)
if err != nil {
panic(err)
}
if err := start(f); err != nil {
// this means that the runtime failed starting the container and will have the
// proper error messages in the runtime log so we should to treat this as a
// shim failure because the sim executed properly
if err == errRuntime {
f.Close()
return
}
// log the error instead of writing to stderr because the shim will have
// /dev/null as it's stdio because it is supposed to be reparented to system
// init and will not have anyone to read from it
writeMessage(f, "error", err)
f.Close()
os.Exit(1)
}
}

main()的流程如下:

  1. 调用os.Getwd()获取进程目录,即containerd中设置好的进程目录,其中每个容器的init进程称为”init”;
  2. 打开”shim-log.json”,如果出错,则往该文件中记录日志;
  3. 调用start()执行进一步的操作。

start()定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
func start(log *os.File) error {
// start handling signals as soon as possible so that things are properly reaped
// or if runtime exits before we hit the handler
signals := make(chan os.Signal, 2048)
//***监听所有信号,把信号发送给signals channel***//
signal.Notify(signals)
// set the shim as the subreaper for all orphaned processes created by the container
if err := osutils.SetSubreaper(1); err != nil {
return err
}
// open the exit pipe
f, err := os.OpenFile("exit", syscall.O_WRONLY, 0)
if err != nil {
return err
}
defer f.Close()
control, err := os.OpenFile("control", syscall.O_RDWR, 0)
if err != nil {
return err
}
defer control.Close()
//***创建process***//
//***flag.Arg(0): nginx flag.Arg(1): /home/fankang/mycontainer flag.Arg(2): runc***//
p, err := newProcess(flag.Arg(0), flag.Arg(1), flag.Arg(2))
if err != nil {
return err
}
defer func() {
if err := p.Close(); err != nil {
writeMessage(log, "warn", err)
}
}()
//***创建并执行runc命令***//
if err := p.create(); err != nil {
p.delete()
return err
}
msgC := make(chan controlMessage, 32)
go func() {
for {
var m controlMessage
//***Fscanf()用于扫描r中的数据,并根据format指定的格式***//
if _, err := fmt.Fscanf(control, "%d %d %d\n", &m.Type, &m.Width, &m.Height); err != nil {
continue
}
msgC <- m
}
}()
//***处理信号量及msg通知***//
var exitShim bool
for {
select {
case s := <-signals:
switch s {
case syscall.SIGCHLD:
exits, _ := osutils.Reap(false)
for _, e := range exits {
// check to see if runtime is one of the processes that has exited
if e.Pid == p.pid() {
exitShim = true
writeInt("exitStatus", e.Status)
}
}
}
// runtime has exited so the shim can also exit
if exitShim {
// Let containerd take care of calling the runtime
// delete.
// This is needed to be done first in order to ensure
// that the call to Reap does not block until all
// children of the container have died if init was not
// started in its own PID namespace.
f.Close()
// Wait for all the childs this process may have
// created (needed for exec and init processes when
// they join another pid namespace)
osutils.Reap(true)
p.Wait()
return nil
}
//***处理control中的msg***//
case msg := <-msgC:
switch msg.Type {
case 0:
// close stdin
if p.stdinCloser != nil {
p.stdinCloser.Close()
}
case 1:
if p.console == nil {
continue
}
ws := term.Winsize{
Width: uint16(msg.Width),
Height: uint16(msg.Height),
}
term.SetWinsize(p.console.Fd(), &ws)
}
}
}
return nil
}

start()的流程如下:

  1. 调用newProcess()生成process;
  2. 调用p.create()创建并执行runc命令;
  3. 监听进程目录下control中的指令,其中0为关闭进程输入,1为调整窗口大小(还不清楚为什么这么设计);
  4. 监听进程的事件,并作出处理。

所以可以看出,一般来说,containerd-shim不会主动退出。

Process

conainerd-shim的process定义在/containerd-shim/process.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//***对应process.json***//
type processState struct {
specs.ProcessSpec
Exec bool `json:"exec"`
Stdin string `json:"containerdStdin"`
Stdout string `json:"containerdStdout"`
Stderr string `json:"containerdStderr"`
RuntimeArgs []string `json:"runtimeArgs"`
NoPivotRoot bool `json:"noPivotRoot"`
CheckpointPath string `json:"checkpoint"`
RootUID int `json:"rootUID"`
RootGID int `json:"rootGID"`
}
type process struct {
sync.WaitGroup
id string
bundle string
stdio *stdio
exec bool
containerPid int
checkpoint *checkpoint
checkpointPath string
shimIO *IO
stdinCloser io.Closer
console *os.File
consolePath string
state *processState
runtime string
}

其中processState对应进程的process.json。

newProcess()

newProcess()调用loadProcess()从process.json中读取processState,并打开进程的标准输入输出,然后生成process并返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func newProcess(id, bundle, runtimeName string) (*process, error) {
p := &process{
id: id,
bundle: bundle,
runtime: runtimeName,
}
//***加载state***//
s, err := loadProcess()
if err != nil {
return nil, err
}
p.state = s
if s.CheckpointPath != "" {
cpt, err := loadCheckpoint(s.CheckpointPath)
if err != nil {
return nil, err
}
p.checkpoint = cpt
p.checkpointPath = s.CheckpointPath
}
if err := p.openIO(); err != nil {
return nil, err
}
return p, nil
}
func loadProcess() (*processState, error) {
f, err := os.Open("process.json")
if err != nil {
return nil, err
}
defer f.Close()
var s processState
if err := json.NewDecoder(f).Decode(&s); err != nil {
return nil, err
}
return &s, nil
}

create()

create的流程如下:

  1. 根据processState,即containerd生成的process.json,生成runc命令;
  2. 执行runc命令,等待runc命令执行完成;
  3. 获取pid文件中的pid,更新process的containerPid(pid由runc写入)。

步骤1中的runc命令生成流程如下:

  1. 如果exec为true,则runc命令为”runc exec”,如,”runc –log /run/docker/libcontainerd/containerd/nginx/4067/log.json –log-format json exec -d –process /run/docker/libcontainerd/containerd/nginx/4067/process.json –console /dev/pts/6 –pid-file /run/docker/libcontainerd/containerd/nginx/4067/pid nginx”
  2. 否则,如果checkpoint不为空,则生成””(checkpoint不是研究重点,所以现在命令还不大熟,以后补充);
  3. 否则,runc命令为”runc create”,如,”runc –log /run/docker/libcontainerd/containerd/nginx/init/log.json –log-format json create –bundle /home/fankang/mycontainer –console /dev/pts/5 –pid-file /run/docker/libcontainerd/containerd/nginx/init/pid nginx”。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    //***转换成runc命令,并执行***//
    func (p *process) create() error {
    cwd, err := os.Getwd()
    if err != nil {
    return err
    }
    logPath := filepath.Join(cwd, "log.json")
    args := append([]string{
    "--log", logPath,
    "--log-format", "json",
    }, p.state.RuntimeArgs...)
    //***如果p.state.Exec为true,则调用runc exec***//
    if p.state.Exec {
    args = append(args, "exec",
    "-d",
    "--process", filepath.Join(cwd, "process.json"),
    "--console", p.consolePath,
    )
    } else if p.checkpoint != nil {
    args = append(args, "restore",
    "-d",
    "--image-path", p.checkpointPath,
    "--work-path", filepath.Join(p.checkpointPath, "criu.work", "restore-"+time.Now().Format(time.RFC3339)),
    )
    add := func(flags ...string) {
    args = append(args, flags...)
    }
    if p.checkpoint.Shell {
    add("--shell-job")
    }
    if p.checkpoint.TCP {
    add("--tcp-established")
    }
    if p.checkpoint.UnixSockets {
    add("--ext-unix-sk")
    }
    if p.state.NoPivotRoot {
    add("--no-pivot")
    }
    for _, ns := range p.checkpoint.EmptyNS {
    add("--empty-ns", ns)
    }
    } else {
    args = append(args, "create",
    "--bundle", p.bundle,
    "--console", p.consolePath,
    )
    if p.state.NoPivotRoot {
    args = append(args, "--no-pivot")
    }
    }
    args = append(args,
    "--pid-file", filepath.Join(cwd, "pid"),
    p.id,
    )
    //***生成runc命令***//
    //***runc --log /run/docker/libcontainerd/containerd/nginx/init/log.json --log-format json create --bundle /home/fankang/mycontainer --console /dev/pts/5 --pid-file /run/docker/libcontainerd/containerd/nginx/init/pid nginx***//
    cmd := exec.Command(p.runtime, args...)
    cmd.Dir = p.bundle
    cmd.Stdin = p.stdio.stdin
    cmd.Stdout = p.stdio.stdout
    cmd.Stderr = p.stdio.stderr
    // Call out to setPDeathSig to set SysProcAttr as elements are platform specific
    cmd.SysProcAttr = setPDeathSig()
    //***启动runc命令***//
    if err := cmd.Start(); err != nil {
    if exErr, ok := err.(*exec.Error); ok {
    if exErr.Err == exec.ErrNotFound || exErr.Err == os.ErrNotExist {
    return fmt.Errorf("%s not installed on system", p.runtime)
    }
    }
    return err
    }
    p.stdio.stdout.Close()
    p.stdio.stderr.Close()
    //***等待runc命令运行完成***//
    if err := cmd.Wait(); err != nil {
    if _, ok := err.(*exec.ExitError); ok {
    return errRuntime
    }
    return err
    }
    //***pid中存储有容器内进程的pid号,如9947***//
    data, err := ioutil.ReadFile("pid")
    if err != nil {
    return err
    }
    //***strconv.Atoi把string转换成int***//
    pid, err := strconv.Atoi(string(data))
    if err != nil {
    return err
    }
    p.containerPid = pid
    return nil
    }

openIO()

openIO()处理进程的标准输入输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
func (p *process) openIO() error {
p.stdio = &stdio{}
var (
uid = p.state.RootUID
gid = p.state.RootGID
)
go func() {
if stdinCloser, err := os.OpenFile(p.state.Stdin, syscall.O_WRONLY, 0); err == nil {
p.stdinCloser = stdinCloser
}
}()
if p.state.Terminal {
master, console, err := newConsole(uid, gid)
if err != nil {
return err
}
p.console = master
p.consolePath = console
stdin, err := os.OpenFile(p.state.Stdin, syscall.O_RDONLY, 0)
if err != nil {
return err
}
go io.Copy(master, stdin)
stdout, err := os.OpenFile(p.state.Stdout, syscall.O_RDWR, 0)
if err != nil {
return err
}
p.Add(1)
go func() {
io.Copy(stdout, master)
master.Close()
p.Done()
}()
return nil
}
i, err := p.initializeIO(uid)
if err != nil {
return err
}
p.shimIO = i
// non-tty
for name, dest := range map[string]func(f *os.File){
p.state.Stdout: func(f *os.File) {
p.Add(1)
go func() {
io.Copy(f, i.Stdout)
p.Done()
}()
},
p.state.Stderr: func(f *os.File) {
p.Add(1)
go func() {
io.Copy(f, i.Stderr)
p.Done()
}()
},
} {
f, err := os.OpenFile(name, syscall.O_RDWR, 0)
if err != nil {
return err
}
dest(f)
}
f, err := os.OpenFile(p.state.Stdin, syscall.O_RDONLY, 0)
if err != nil {
return err
}
go func() {
io.Copy(i.Stdin, f)
i.Stdin.Close()
}()
return nil
}