启动过程

containerd的main()函数定义在/containerd/main.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func main() {
logrus.SetFormatter(&logrus.TextFormatter{TimestampFormat: time.RFC3339Nano})
app := cli.NewApp()
app.Name = "containerd"
if containerd.GitCommit != "" {
app.Version = fmt.Sprintf("%s commit: %s", containerd.Version, containerd.GitCommit)
} else {
app.Version = containerd.Version
}
app.Usage = usage
app.Flags = daemonFlags
app.Before = func(context *cli.Context) error {
setupDumpStacksTrap()
if context.GlobalBool("debug") {
logrus.SetLevel(logrus.DebugLevel)
if context.GlobalDuration("metrics-interval") > 0 {
if err := debugMetrics(context.GlobalDuration("metrics-interval"), context.GlobalString("graphite-address")); err != nil {
return err
}
}
}
if p := context.GlobalString("pprof-address"); len(p) > 0 {
pprof.Enable(p)
}
if err := checkLimits(); err != nil {
return err
}
return nil
}
app.Action = func(context *cli.Context) {
if err := daemon(context); err != nil {
logrus.Fatal(err)
}
}
if err := app.Run(os.Args); err != nil {
logrus.Fatal(err)
}
}

containerd使用的是”github.com/codegangsta/cli”包,该包的用法如下:

1
2
3
4
5
6
7
8
9
10
func main() {
app := cli.NewApp()
app.Name = "greet"
app.Usage = "say a greeting"
app.Action = func(c *cli.Context) {
println("Greetings")
}
app.Run(os.Args)
}

可以使用cli.Context把命令行的参数传给整个程序。

可以从main()函数看出,containerd的启动核心是daemon(context)。所以我们来看下daemon()函数,daemon()函数同样定义在main.go()中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func daemon(context *cli.Context) error {
s := make(chan os.Signal, 2048)
//***第一个参数表示接收信号的管道, 第二个及后面的参数表示设置要监听的信号,如果不设置表示监听所有的信号***//
signal.Notify(s, syscall.SIGTERM, syscall.SIGINT)
//***生成supervisor***//
sv, err := supervisor.New(
context.String("state-dir"),
context.String("runtime"),
context.String("shim"),
context.StringSlice("runtime-args"),
context.Duration("start-timeout"),
context.Int("retain-count"))
if err != nil {
return err
}
wg := &sync.WaitGroup{}
//***启动10个worker***//
for i := 0; i < 10; i++ {
wg.Add(1)
w := supervisor.NewWorker(sv, wg)
go w.Start()
}
//***启动supervisor***//
if err := sv.Start(); err != nil {
return err
}
// Split the listen string of the form proto://addr
listenSpec := context.String("listen")
listenParts := strings.SplitN(listenSpec, "://", 2)
if len(listenParts) != 2 {
return fmt.Errorf("bad listen address format %s, expected proto://address", listenSpec)
}
//***启动gRPC server***//
server, err := startServer(listenParts[0], listenParts[1], sv)
if err != nil {
return err
}
//***处理信号***//
for ss := range s {
switch ss {
default:
logrus.Infof("stopping containerd after receiving %s", ss)
server.Stop()
os.Exit(0)
}
}
return nil
}

在daemon()中,主要完成以下几件事情:

  1. 启动gRPCServer;
  2. 生成并启动supervisor;
  3. 生成并启动10个worker。

我们来先看gRPCServer。

启动gRPCServer

daemon()使用下面代码启动gRPCServer:

1
2
3
4
server, err := startServer(listenParts[0], listenParts[1], sv)
if err != nil {
return err
}

来看startServer()函数,定义在/main.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//***启动gRPC server***//
func startServer(protocol, address string, sv *supervisor.Supervisor) (*grpc.Server, error) {
// TODO: We should use TLS.
// TODO: Add an option for the SocketGroup.
//***1 生成socket***//
sockets, err := listeners.Init(protocol, address, "", nil)
if err != nil {
return nil, err
}
if len(sockets) != 1 {
return nil, fmt.Errorf("incorrect number of listeners")
}
l := sockets[0]
//***2 生成gRPC server***//
s := grpc.NewServer()
//***3 注册消息处理结构体***//
types.RegisterAPIServer(s, server.NewServer(sv))
healthServer := health.NewHealthServer()
grpc_health_v1.RegisterHealthServer(s, healthServer)
go func() {
logrus.Debugf("containerd: grpc api on %s", address)
//***4 监听***//
if err := s.Serve(l); err != nil {
logrus.WithField("error", err).Fatal("containerd: serve grpc")
}
}()
return s, nil
}

启动gRPCServer主要有以下几个步骤:

  1. 生成socket;
  2. 生成gRPC server;
  3. 注册消息处理结构体;
  4. 启动gRPCServer,监听。

关于gRPC的具体用法,见”gRPC-demo”。

gRPCServer的消息处理结构体定义在/api/grpc/server/server.go中,可以使用NewServer()获取:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// NewServer returns grpc server instance
func NewServer(sv *supervisor.Supervisor) types.APIServer {
return &apiServer{
sv: sv,
}
}
```
在apiserver结构体中,定义有各处理方法,如CreateContainer(), AddProcess(), Signal(), State(), Stats()等。这些将在以后分析。
## supervisor
supervisor是containerd的核心,处理系统中的各种task,及记录着并监视着系统中每个container。supervisor定义在/supervisor/supervisor.go中:
``` Go
// Supervisor represents a container supervisor
type Supervisor struct {
// stateDir is the directory on the system to store container runtime state information.
stateDir string
// name of the OCI compatible runtime used to execute containers
runtime string
runtimeArgs []string
shim string
containers map[string]*containerInfo
startTasks chan *startTask
// we need a lock around the subscribers map only because additions and deletions from
// the map are via the API so we cannot really control the concurrency
subscriberLock sync.RWMutex
subscribers map[chan Event]struct{}
machine Machine
tasks chan Task
monitor *Monitor
eventLog []Event
eventLock sync.Mutex
timeout time.Duration
}

其中:

  • stateDir: containerd的工作目录,如/var/run/docker/libcontainerd/containerd;
  • runtime: 一般为runc;
  • shim: containerd-shim路径,shim为containerd和runtime之间的桥梁;
  • containers: 系统中的containers,key为id;
  • startTasks: startTask的channel,worker会自动处理该channel中的startTask;
  • subscribers: 管理event处理者,每个event处理者传进来的是一个channel,通过notifySubscribers()方法分发事件;
  • machine: 物理机相关信息;
  • tasks:task的channel;
  • monitor: 容器进程监视器;
  • eventLog: 记录系统中的event;

可以通过New()生成一个新的supervisor:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// New returns an initialized Process supervisor.
func New(stateDir string, runtimeName, shimName string, runtimeArgs []string, timeout time.Duration, retainCount int) (*Supervisor, error) {
startTasks := make(chan *startTask, 10)
if err := os.MkdirAll(stateDir, 0755); err != nil {
return nil, err
}
machine, err := CollectMachineInformation()
if err != nil {
return nil, err
}
monitor, err := NewMonitor()
if err != nil {
return nil, err
}
s := &Supervisor{
stateDir: stateDir,
containers: make(map[string]*containerInfo),
startTasks: startTasks,
machine: machine,
subscribers: make(map[chan Event]struct{}),
tasks: make(chan Task, defaultBufferSize),
monitor: monitor,
runtime: runtimeName,
runtimeArgs: runtimeArgs,
shim: shimName,
timeout: timeout,
}
//***处理event日志***//
if err := setupEventLog(s, retainCount); err != nil {
return nil, err
}
go s.exitHandler()
go s.oomHandler()
if err := s.restore(); err != nil {
return nil, err
}
return s, nil
}

在New()函数中,会生成supervisor的monitor等,并启动exitHandler()及oomHandler()分别对被挂起的进程及被oom容器的处理。monitor会监控被挂起的容器进程及被oom的容器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//***处理被挂起的process***//
func (s *Supervisor) exitHandler() {
for p := range s.monitor.Exits() {
e := &ExitTask{
Process: p,
}
s.SendTask(e)
}
}
//***处理已经oom的process***//
func (s *Supervisor) oomHandler() {
for id := range s.monitor.OOMs() {
e := &OOMTask{
ID: id,
}
s.SendTask(e)
}
}

supervisor通过Start()方法启动,Start()定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//***处理tasks channel中的task***//
func (s *Supervisor) Start() error {
logrus.WithFields(logrus.Fields{
"stateDir": s.stateDir,
"runtime": s.runtime,
"runtimeArgs": s.runtimeArgs,
"memory": s.machine.Memory,
"cpus": s.machine.Cpus,
}).Debug("containerd: supervisor running")
//***调用handleTask()处理tasks channel中的task***//
go func() {
for i := range s.tasks {
s.handleTask(i)
}
}()
return nil
}

在Start()中,会启动一个goroutine调用handleTask()方法处理tasks channel中的task。所以,只要把task放到supervisor的tasks channel中,supervisor就会处理该task。

handleTask()方法定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (s *Supervisor) handleTask(i Task) {
var err error
switch t := i.(type) {
case *AddProcessTask:
err = s.addProcess(t)
case *CreateCheckpointTask:
err = s.createCheckpoint(t)
case *DeleteCheckpointTask:
err = s.deleteCheckpoint(t)
//***如果类型为StartTask,那么调用Start()处理task***//
//***start()定义在create.go中***//
case *StartTask:
err = s.start(t)
//***如果类型为DeleteTask,那么调用delete()处理task***//
case *DeleteTask:
err = s.delete(t)
case *ExitTask:
err = s.exit(t)
case *GetContainersTask:
err = s.getContainers(t)
case *SignalTask:
err = s.signal(t)
case *StatsTask:
err = s.stats(t)
case *UpdateTask:
err = s.updateContainer(t)
case *UpdateProcessTask:
err = s.updateProcess(t)
case *OOMTask:
err = s.oom(t)
default:
err = ErrUnknownTask
}
if err != errDeferredResponse {
i.ErrorCh() <- err
close(i.ErrorCh())
}
}

可以看出,handleTask()会依据task的类型来调用不同的处理函数。这里以StartTask来举例。

StartTask

StartTask定义在/supervisor/create.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//***StarkTask***//
type StartTask struct {
baseTask
ID string
BundlePath string
Stdout string
Stderr string
Stdin string
StartResponse chan StartResponse
Labels []string
NoPivotRoot bool
Checkpoint *runtime.Checkpoint
CheckpointDir string
Runtime string
RuntimeArgs []string
}

其对应的处理函数为同文件中start()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//***start()可以处理StarkTask***//
func (s *Supervisor) start(t *StartTask) error {
start := time.Now()
rt := s.runtime
rtArgs := s.runtimeArgs
if t.Runtime != "" {
rt = t.Runtime
rtArgs = t.RuntimeArgs
}
//***生成container***//
container, err := runtime.New(runtime.ContainerOpts{
Root: s.stateDir,
ID: t.ID,
Bundle: t.BundlePath,
Runtime: rt,
RuntimeArgs: rtArgs,
Shim: s.shim,
Labels: t.Labels,
NoPivotRoot: t.NoPivotRoot,
Timeout: s.timeout,
})
if err != nil {
return err
}
s.containers[t.ID] = &containerInfo{
container: container,
}
ContainersCounter.Inc(1)
//***生成startTask***//
task := &startTask{
Err: t.ErrorCh(),
Container: container,
StartResponse: t.StartResponse,
Stdin: t.Stdin,
Stdout: t.Stdout,
Stderr: t.Stderr,
}
if t.Checkpoint != nil {
task.CheckpointPath = filepath.Join(t.CheckpointDir, t.Checkpoint.Name)
}
//***把task处理后,放入startTasks channel***//
s.startTasks <- task
ContainerCreateTimer.UpdateSince(start)
return errDeferredResponse
}

在start()方法中,先生成container,并记录到supervisor的containers中,然后重新组装成startTask(此处startTask,s为小写,定义在/superviso/worker.go中),放入startTasks channel中供worker处理。

其他task的定义及处理函数与StartTask类似,如DeleteTask定义在/supervisor/delete.go中,处理方法为delete();ExitTask定义在/supervisor/exit.go中,处理方法为exit()。

那么这些task是什么时候放入supervisor的tasks中的呢?在gRPCServer的处理结构体server的方法中(/api/grpc/server/server.go)。如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//***第一个参数为context,第二个参数为传入的请求,固定用法***//
func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContainerRequest) (*types.CreateContainerResponse, error) {
if c.BundlePath == "" {
return nil, errors.New("empty bundle path")
}
//***生成StartTask***//
e := &supervisor.StartTask{}
e.ID = c.Id
e.BundlePath = c.BundlePath
e.Stdin = c.Stdin
e.Stdout = c.Stdout
e.Stderr = c.Stderr
e.Labels = c.Labels
e.NoPivotRoot = c.NoPivotRoot
e.Runtime = c.Runtime
e.RuntimeArgs = c.RuntimeArgs
e.StartResponse = make(chan supervisor.StartResponse, 1)
if c.Checkpoint != "" {
e.CheckpointDir = c.CheckpointDir
e.Checkpoint = &runtime.Checkpoint{
Name: c.Checkpoint,
}
}
//***把StartTask发送到supervisor的tasks channel***//
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
}
r := <-e.StartResponse
//***创建容器实例***//
apiC, err := createAPIContainer(r.Container, false)
if err != nil {
return nil, errW
}
return &types.CreateContainerResponse{
Container: apiC,
}, nil
}

CreateContainer()在把startTask放入tasks channel后,会一直等待StartResponse的返回。

worker

在main.go中的daemon(),生成了10个worker:

1
2
3
4
5
6
//***启动10个worker***//
for i := 0; i < 10; i++ {
wg.Add(1)
w := supervisor.NewWorker(sv, wg)
go w.Start()
}

worker可以处理supervisor的startTasks channel中的startTask。定义在/supervisor/worker.go中:

1
2
3
4
5
6
7
8
9
10
11
12
// NewWorker return a new initialized worker
func NewWorker(s *Supervisor, wg *sync.WaitGroup) Worker {
return &worker{
s: s,
wg: wg,
}
}
type worker struct {
wg *sync.WaitGroup
s *Supervisor
}

可以通过Start()方法启动worker:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// Start runs a loop in charge of starting new containers
func (w *worker) Start() {
defer w.wg.Done()
//***消费supervisor的startTasks channel中的task,并作处理***//
for t := range w.s.startTasks {
started := time.Now()
//***启动container,调用Container的Start()***//
//***Start()会通过container-shim调用runc create***//
process, err := t.Container.Start(t.CheckpointPath, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr))
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err,
"id": t.Container.ID(),
}).Error("containerd: start container")
t.Err <- err
//***启动失败则创建DeleteTask,并放入tasks中***//
evt := &DeleteTask{
ID: t.Container.ID(),
NoEvent: true,
Process: process,
}
w.s.SendTask(evt)
continue
}
//***启动motitor监控容器***//
if err := w.s.monitor.MonitorOOM(t.Container); err != nil && err != runtime.ErrContainerExited {
if process.State() != runtime.Stopped {
logrus.WithField("error", err).Error("containerd: notify OOM events")
}
}
if err := w.s.monitorProcess(process); err != nil {
logrus.WithField("error", err).Error("containerd: add process to monitor")
t.Err <- err
evt := &DeleteTask{
ID: t.Container.ID(),
NoEvent: true,
Process: process,
}
w.s.SendTask(evt)
continue
}
// only call process start if we aren't restoring from a checkpoint
// if we have restored from a checkpoint then the process is already started
//***调用process.Start()***//
//***process.Start()中会调用runc start id命令启动容器***//
if t.CheckpointPath == "" {
if err := process.Start(); err != nil {
logrus.WithField("error", err).Error("containerd: start init process")
t.Err <- err
evt := &DeleteTask{
ID: t.Container.ID(),
NoEvent: true,
Process: process,
}
w.s.SendTask(evt)
continue
}
}
ContainerStartTimer.UpdateSince(started)
t.Err <- nil
//***把结果传给StartResponse***//
t.StartResponse <- StartResponse{
Container: t.Container,
}
w.s.notifySubscribers(Event{
Timestamp: time.Now(),
ID: t.Container.ID(),
Type: StateStart,
})
}
}

Start()方法会消费supervisor的startTasks channel中的内容。对startTask,主要做以下处理:

  1. 调用Container.Start()生成容器;
  2. 调用monitor.MonitorOOM()和monitorProcess()监控容器进程;
  3. 调用process.Start()启动容器;
  4. 把处理结果返回给startTask的StartResponse channel。

monitor

monitor负责监控容器进程,定义在/supervisor/monitor_linux.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Monitor represents a runtime.Process monitor
type Monitor struct {
m sync.Mutex
receivers map[int]interface{}
exits chan runtime.Process
ooms chan string
epollFd int
}
//***monitor可以监视process***//
func NewMonitor() (*Monitor, error) {
m := &Monitor{
receivers: make(map[int]interface{}),
exits: make(chan runtime.Process, 1024),
ooms: make(chan string, 1024),
}
fd, err := archutils.EpollCreate1(0)
if err != nil {
return nil, err
}
m.epollFd = fd
go m.start()
return m, nil
}

其中:

  • receivers: 标识被监控的容器进程;
  • exits: 退出的容器的id;
  • ooms:被oom的容器的id。

monitor可以使用start()方法启动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func (m *Monitor) start() {
var events [128]syscall.EpollEvent
for {
//***EpollWait()收集在epoll监控的事件中已经发送的事件***//
n, err := archutils.EpollWait(m.epollFd, events[:], -1)
if err != nil {
if err == syscall.EINTR {
continue
}
logrus.WithField("error", err).Fatal("containerd: epoll wait")
}
// process events
for i := 0; i < n; i++ {
fd := int(events[i].Fd)
m.m.Lock()
r := m.receivers[fd]
switch t := r.(type) {
//***如果是process类型***//
case runtime.Process:
if events[i].Events == syscall.EPOLLHUP {
delete(m.receivers, fd)
if err = syscall.EpollCtl(m.epollFd, syscall.EPOLL_CTL_DEL, fd, &syscall.EpollEvent{
Events: syscall.EPOLLHUP,
Fd: int32(fd),
}); err != nil {
logrus.WithField("error", err).Error("containerd: epoll remove fd")
}
if err := t.Close(); err != nil {
logrus.WithField("error", err).Error("containerd: close process IO")
}
EpollFdCounter.Dec(1)
//***放入exits channel中***//
m.exits <- t
}
//***被OOM***//
case runtime.OOM:
// always flush the event fd
t.Flush()
if t.Removed() {
delete(m.receivers, fd)
// epoll will remove the fd from its set after it has been closed
t.Close()
EpollFdCounter.Dec(1)
} else {
//***放入到ooms channel中***//
m.ooms <- t.ContainerID()
}
}
m.m.Unlock()
}
}
}

start()流程如下:

  1. 通过archutils.EpollWait()获取events;
  2. 从receivers中获取event对应的类型;
  3. 如果类型是runtime.Process,且事件为EPOLLUP,则把该process放入到exits channel中,表示已被挂起;
  4. 如果类型为OOM,则把container id加入到ooms channel中。

可以通过Exits()方法获取exits channel中的内容;通过OOMs()方法获取ooms channel中的内容:

1
2
3
4
5
6
7
8
9
/ Exits returns the channel used to notify of a process exit
func (m *Monitor) Exits() chan runtime.Process {
return m.exits
}
// OOMs returns the channel used to notify of a container exit due to OOM
func (m *Monitor) OOMs() chan string {
return m.ooms
}

ooms及exits的内容在supervisor的exitHandler()及oomHandler()中会用到。

那么,monitor是如何注册监听事件的呢?答案是通过Monitor()及MonitorOOM()两个方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// Monitor adds a process to the list of the one being monitored
func (m *Monitor) Monitor(p runtime.Process) error {
m.m.Lock()
defer m.m.Unlock()
fd := p.ExitFD()
//***定义EpollEvent事件***//
event := syscall.EpollEvent{
Fd: int32(fd),
Events: syscall.EPOLLHUP,
}
//***注册事件***//
if err := archutils.EpollCtl(m.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil {
return err
}
EpollFdCounter.Inc(1)
m.receivers[fd] = p
return nil
}
// MonitorOOM adds a container to the list of the ones monitored for OOM
func (m *Monitor) MonitorOOM(c runtime.Container) error {
m.m.Lock()
defer m.m.Unlock()
o, err := c.OOM()
if err != nil {
return err
}
fd := o.FD()
//***定义EpollEvent事件***//
event := syscall.EpollEvent{
Fd: int32(fd),
Events: syscall.EPOLLHUP | syscall.EPOLLIN,
}
//***注册事件***//
if err := archutils.EpollCtl(m.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil {
return err
}
EpollFdCounter.Inc(1)
m.receivers[fd] = o
return nil
}

两个方法的流程如下:

  1. 定义EpollEvent;
  2. 使用EpoolCtl注册事件;
  3. 把监听行为添加到receivers中。

关于Epoll机制,详见http://blog.csdn.net/xiajun07061225/article/details/9250579。

总结

  1. containerd使用gRPC机制对外提供服务,其内部核心是一个supervisor,其主要有tasks和startTasks两个channel。supervisor会自动处理tasks channel中的内容,处理的方法就是根据task类型的不同调用不同的处理方法。其中StartTask使用start()方法处理,会把StartTask处理成startTask,并放入startTasks channel中。
  2. worker会消费startTasks channel中的内容,先通过containerd-shim使用”runc create”创建容器,再使用”runc start”启动容器。
  3. containerd中的monitor会对运行的容器进行监控,并作出相应的处理。