启动过程
containerd的main()函数定义在/containerd/main.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| func main() { logrus.SetFormatter(&logrus.TextFormatter{TimestampFormat: time.RFC3339Nano}) app := cli.NewApp() app.Name = "containerd" if containerd.GitCommit != "" { app.Version = fmt.Sprintf("%s commit: %s", containerd.Version, containerd.GitCommit) } else { app.Version = containerd.Version } app.Usage = usage app.Flags = daemonFlags app.Before = func(context *cli.Context) error { setupDumpStacksTrap() if context.GlobalBool("debug") { logrus.SetLevel(logrus.DebugLevel) if context.GlobalDuration("metrics-interval") > 0 { if err := debugMetrics(context.GlobalDuration("metrics-interval"), context.GlobalString("graphite-address")); err != nil { return err } } } if p := context.GlobalString("pprof-address"); len(p) > 0 { pprof.Enable(p) } if err := checkLimits(); err != nil { return err } return nil } app.Action = func(context *cli.Context) { if err := daemon(context); err != nil { logrus.Fatal(err) } } if err := app.Run(os.Args); err != nil { logrus.Fatal(err) } }
|
containerd使用的是”github.com/codegangsta/cli”包,该包的用法如下:
1 2 3 4 5 6 7 8 9 10
| func main() { app := cli.NewApp() app.Name = "greet" app.Usage = "say a greeting" app.Action = func(c *cli.Context) { println("Greetings") } app.Run(os.Args) }
|
可以使用cli.Context把命令行的参数传给整个程序。
可以从main()函数看出,containerd的启动核心是daemon(context)。所以我们来看下daemon()函数,daemon()函数同样定义在main.go()中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| func daemon(context *cli.Context) error { s := make(chan os.Signal, 2048) signal.Notify(s, syscall.SIGTERM, syscall.SIGINT) sv, err := supervisor.New( context.String("state-dir"), context.String("runtime"), context.String("shim"), context.StringSlice("runtime-args"), context.Duration("start-timeout"), context.Int("retain-count")) if err != nil { return err } wg := &sync.WaitGroup{} for i := 0; i < 10; i++ { wg.Add(1) w := supervisor.NewWorker(sv, wg) go w.Start() } if err := sv.Start(); err != nil { return err } listenSpec := context.String("listen") listenParts := strings.SplitN(listenSpec, "://", 2) if len(listenParts) != 2 { return fmt.Errorf("bad listen address format %s, expected proto://address", listenSpec) } server, err := startServer(listenParts[0], listenParts[1], sv) if err != nil { return err } for ss := range s { switch ss { default: logrus.Infof("stopping containerd after receiving %s", ss) server.Stop() os.Exit(0) } } return nil }
|
在daemon()中,主要完成以下几件事情:
- 启动gRPCServer;
- 生成并启动supervisor;
- 生成并启动10个worker。
我们来先看gRPCServer。
启动gRPCServer
daemon()使用下面代码启动gRPCServer:
1 2 3 4
| server, err := startServer(listenParts[0], listenParts[1], sv) if err != nil { return err }
|
来看startServer()函数,定义在/main.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| func startServer(protocol, address string, sv *supervisor.Supervisor) (*grpc.Server, error) { sockets, err := listeners.Init(protocol, address, "", nil) if err != nil { return nil, err } if len(sockets) != 1 { return nil, fmt.Errorf("incorrect number of listeners") } l := sockets[0] s := grpc.NewServer() types.RegisterAPIServer(s, server.NewServer(sv)) healthServer := health.NewHealthServer() grpc_health_v1.RegisterHealthServer(s, healthServer) go func() { logrus.Debugf("containerd: grpc api on %s", address) if err := s.Serve(l); err != nil { logrus.WithField("error", err).Fatal("containerd: serve grpc") } }() return s, nil }
|
启动gRPCServer主要有以下几个步骤:
- 生成socket;
- 生成gRPC server;
- 注册消息处理结构体;
- 启动gRPCServer,监听。
关于gRPC的具体用法,见”gRPC-demo”。
gRPCServer的消息处理结构体定义在/api/grpc/server/server.go中,可以使用NewServer()获取:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| func NewServer(sv *supervisor.Supervisor) types.APIServer { return &apiServer{ sv: sv, } } ``` 在apiserver结构体中,定义有各处理方法,如CreateContainer(), AddProcess(), Signal(), State(), Stats()等。这些将在以后分析。 ## supervisor supervisor是containerd的核心,处理系统中的各种task,及记录着并监视着系统中每个container。supervisor定义在/supervisor/supervisor.go中: ``` Go type Supervisor struct { stateDir string runtime string runtimeArgs []string shim string containers map[string]*containerInfo startTasks chan *startTask subscriberLock sync.RWMutex subscribers map[chan Event]struct{} machine Machine tasks chan Task monitor *Monitor eventLog []Event eventLock sync.Mutex timeout time.Duration }
|
其中:
- stateDir: containerd的工作目录,如/var/run/docker/libcontainerd/containerd;
- runtime: 一般为runc;
- shim: containerd-shim路径,shim为containerd和runtime之间的桥梁;
- containers: 系统中的containers,key为id;
- startTasks: startTask的channel,worker会自动处理该channel中的startTask;
- subscribers: 管理event处理者,每个event处理者传进来的是一个channel,通过notifySubscribers()方法分发事件;
- machine: 物理机相关信息;
- tasks:task的channel;
- monitor: 容器进程监视器;
- eventLog: 记录系统中的event;
可以通过New()生成一个新的supervisor:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| func New(stateDir string, runtimeName, shimName string, runtimeArgs []string, timeout time.Duration, retainCount int) (*Supervisor, error) { startTasks := make(chan *startTask, 10) if err := os.MkdirAll(stateDir, 0755); err != nil { return nil, err } machine, err := CollectMachineInformation() if err != nil { return nil, err } monitor, err := NewMonitor() if err != nil { return nil, err } s := &Supervisor{ stateDir: stateDir, containers: make(map[string]*containerInfo), startTasks: startTasks, machine: machine, subscribers: make(map[chan Event]struct{}), tasks: make(chan Task, defaultBufferSize), monitor: monitor, runtime: runtimeName, runtimeArgs: runtimeArgs, shim: shimName, timeout: timeout, } if err := setupEventLog(s, retainCount); err != nil { return nil, err } go s.exitHandler() go s.oomHandler() if err := s.restore(); err != nil { return nil, err } return s, nil }
|
在New()函数中,会生成supervisor的monitor等,并启动exitHandler()及oomHandler()分别对被挂起的进程及被oom容器的处理。monitor会监控被挂起的容器进程及被oom的容器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| func (s *Supervisor) exitHandler() { for p := range s.monitor.Exits() { e := &ExitTask{ Process: p, } s.SendTask(e) } } func (s *Supervisor) oomHandler() { for id := range s.monitor.OOMs() { e := &OOMTask{ ID: id, } s.SendTask(e) } }
|
supervisor通过Start()方法启动,Start()定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| func (s *Supervisor) Start() error { logrus.WithFields(logrus.Fields{ "stateDir": s.stateDir, "runtime": s.runtime, "runtimeArgs": s.runtimeArgs, "memory": s.machine.Memory, "cpus": s.machine.Cpus, }).Debug("containerd: supervisor running") go func() { for i := range s.tasks { s.handleTask(i) } }() return nil }
|
在Start()中,会启动一个goroutine调用handleTask()方法处理tasks channel中的task。所以,只要把task放到supervisor的tasks channel中,supervisor就会处理该task。
handleTask()方法定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| func (s *Supervisor) handleTask(i Task) { var err error switch t := i.(type) { case *AddProcessTask: err = s.addProcess(t) case *CreateCheckpointTask: err = s.createCheckpoint(t) case *DeleteCheckpointTask: err = s.deleteCheckpoint(t) case *StartTask: err = s.start(t) case *DeleteTask: err = s.delete(t) case *ExitTask: err = s.exit(t) case *GetContainersTask: err = s.getContainers(t) case *SignalTask: err = s.signal(t) case *StatsTask: err = s.stats(t) case *UpdateTask: err = s.updateContainer(t) case *UpdateProcessTask: err = s.updateProcess(t) case *OOMTask: err = s.oom(t) default: err = ErrUnknownTask } if err != errDeferredResponse { i.ErrorCh() <- err close(i.ErrorCh()) } }
|
可以看出,handleTask()会依据task的类型来调用不同的处理函数。这里以StartTask来举例。
StartTask
StartTask定义在/supervisor/create.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| type StartTask struct { baseTask ID string BundlePath string Stdout string Stderr string Stdin string StartResponse chan StartResponse Labels []string NoPivotRoot bool Checkpoint *runtime.Checkpoint CheckpointDir string Runtime string RuntimeArgs []string }
|
其对应的处理函数为同文件中start()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| func (s *Supervisor) start(t *StartTask) error { start := time.Now() rt := s.runtime rtArgs := s.runtimeArgs if t.Runtime != "" { rt = t.Runtime rtArgs = t.RuntimeArgs } container, err := runtime.New(runtime.ContainerOpts{ Root: s.stateDir, ID: t.ID, Bundle: t.BundlePath, Runtime: rt, RuntimeArgs: rtArgs, Shim: s.shim, Labels: t.Labels, NoPivotRoot: t.NoPivotRoot, Timeout: s.timeout, }) if err != nil { return err } s.containers[t.ID] = &containerInfo{ container: container, } ContainersCounter.Inc(1) task := &startTask{ Err: t.ErrorCh(), Container: container, StartResponse: t.StartResponse, Stdin: t.Stdin, Stdout: t.Stdout, Stderr: t.Stderr, } if t.Checkpoint != nil { task.CheckpointPath = filepath.Join(t.CheckpointDir, t.Checkpoint.Name) } s.startTasks <- task ContainerCreateTimer.UpdateSince(start) return errDeferredResponse }
|
在start()方法中,先生成container,并记录到supervisor的containers中,然后重新组装成startTask(此处startTask,s为小写,定义在/superviso/worker.go中),放入startTasks channel中供worker处理。
其他task的定义及处理函数与StartTask类似,如DeleteTask定义在/supervisor/delete.go中,处理方法为delete();ExitTask定义在/supervisor/exit.go中,处理方法为exit()。
那么这些task是什么时候放入supervisor的tasks中的呢?在gRPCServer的处理结构体server的方法中(/api/grpc/server/server.go)。如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| //***第一个参数为context,第二个参数为传入的请求,固定用法***// func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContainerRequest) (*types.CreateContainerResponse, error) { if c.BundlePath == "" { return nil, errors.New("empty bundle path") } //***生成StartTask***// e := &supervisor.StartTask{} e.ID = c.Id e.BundlePath = c.BundlePath e.Stdin = c.Stdin e.Stdout = c.Stdout e.Stderr = c.Stderr e.Labels = c.Labels e.NoPivotRoot = c.NoPivotRoot e.Runtime = c.Runtime e.RuntimeArgs = c.RuntimeArgs e.StartResponse = make(chan supervisor.StartResponse, 1) if c.Checkpoint != "" { e.CheckpointDir = c.CheckpointDir e.Checkpoint = &runtime.Checkpoint{ Name: c.Checkpoint, } } //***把StartTask发送到supervisor的tasks channel***// s.sv.SendTask(e) if err := <-e.ErrorCh(); err != nil { return nil, err } r := <-e.StartResponse //***创建容器实例***// apiC, err := createAPIContainer(r.Container, false) if err != nil { return nil, errW } return &types.CreateContainerResponse{ Container: apiC, }, nil }
|
CreateContainer()在把startTask放入tasks channel后,会一直等待StartResponse的返回。
worker
在main.go中的daemon(),生成了10个worker:
1 2 3 4 5 6
| for i := 0; i < 10; i++ { wg.Add(1) w := supervisor.NewWorker(sv, wg) go w.Start() }
|
worker可以处理supervisor的startTasks channel中的startTask。定义在/supervisor/worker.go中:
1 2 3 4 5 6 7 8 9 10 11 12
| func NewWorker(s *Supervisor, wg *sync.WaitGroup) Worker { return &worker{ s: s, wg: wg, } } type worker struct { wg *sync.WaitGroup s *Supervisor }
|
可以通过Start()方法启动worker:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| func (w *worker) Start() { defer w.wg.Done() for t := range w.s.startTasks { started := time.Now() process, err := t.Container.Start(t.CheckpointPath, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr)) if err != nil { logrus.WithFields(logrus.Fields{ "error": err, "id": t.Container.ID(), }).Error("containerd: start container") t.Err <- err evt := &DeleteTask{ ID: t.Container.ID(), NoEvent: true, Process: process, } w.s.SendTask(evt) continue } if err := w.s.monitor.MonitorOOM(t.Container); err != nil && err != runtime.ErrContainerExited { if process.State() != runtime.Stopped { logrus.WithField("error", err).Error("containerd: notify OOM events") } } if err := w.s.monitorProcess(process); err != nil { logrus.WithField("error", err).Error("containerd: add process to monitor") t.Err <- err evt := &DeleteTask{ ID: t.Container.ID(), NoEvent: true, Process: process, } w.s.SendTask(evt) continue } if t.CheckpointPath == "" { if err := process.Start(); err != nil { logrus.WithField("error", err).Error("containerd: start init process") t.Err <- err evt := &DeleteTask{ ID: t.Container.ID(), NoEvent: true, Process: process, } w.s.SendTask(evt) continue } } ContainerStartTimer.UpdateSince(started) t.Err <- nil t.StartResponse <- StartResponse{ Container: t.Container, } w.s.notifySubscribers(Event{ Timestamp: time.Now(), ID: t.Container.ID(), Type: StateStart, }) } }
|
Start()方法会消费supervisor的startTasks channel中的内容。对startTask,主要做以下处理:
- 调用Container.Start()生成容器;
- 调用monitor.MonitorOOM()和monitorProcess()监控容器进程;
- 调用process.Start()启动容器;
- 把处理结果返回给startTask的StartResponse channel。
monitor
monitor负责监控容器进程,定义在/supervisor/monitor_linux.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| type Monitor struct { m sync.Mutex receivers map[int]interface{} exits chan runtime.Process ooms chan string epollFd int } func NewMonitor() (*Monitor, error) { m := &Monitor{ receivers: make(map[int]interface{}), exits: make(chan runtime.Process, 1024), ooms: make(chan string, 1024), } fd, err := archutils.EpollCreate1(0) if err != nil { return nil, err } m.epollFd = fd go m.start() return m, nil }
|
其中:
- receivers: 标识被监控的容器进程;
- exits: 退出的容器的id;
- ooms:被oom的容器的id。
monitor可以使用start()方法启动:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| func (m *Monitor) start() { var events [128]syscall.EpollEvent for { n, err := archutils.EpollWait(m.epollFd, events[:], -1) if err != nil { if err == syscall.EINTR { continue } logrus.WithField("error", err).Fatal("containerd: epoll wait") } for i := 0; i < n; i++ { fd := int(events[i].Fd) m.m.Lock() r := m.receivers[fd] switch t := r.(type) { case runtime.Process: if events[i].Events == syscall.EPOLLHUP { delete(m.receivers, fd) if err = syscall.EpollCtl(m.epollFd, syscall.EPOLL_CTL_DEL, fd, &syscall.EpollEvent{ Events: syscall.EPOLLHUP, Fd: int32(fd), }); err != nil { logrus.WithField("error", err).Error("containerd: epoll remove fd") } if err := t.Close(); err != nil { logrus.WithField("error", err).Error("containerd: close process IO") } EpollFdCounter.Dec(1) m.exits <- t } case runtime.OOM: t.Flush() if t.Removed() { delete(m.receivers, fd) t.Close() EpollFdCounter.Dec(1) } else { m.ooms <- t.ContainerID() } } m.m.Unlock() } } }
|
start()流程如下:
- 通过archutils.EpollWait()获取events;
- 从receivers中获取event对应的类型;
- 如果类型是runtime.Process,且事件为EPOLLUP,则把该process放入到exits channel中,表示已被挂起;
- 如果类型为OOM,则把container id加入到ooms channel中。
可以通过Exits()方法获取exits channel中的内容;通过OOMs()方法获取ooms channel中的内容:
1 2 3 4 5 6 7 8 9
| / Exits returns the channel used to notify of a process exit func (m *Monitor) Exits() chan runtime.Process { return m.exits } func (m *Monitor) OOMs() chan string { return m.ooms }
|
ooms及exits的内容在supervisor的exitHandler()及oomHandler()中会用到。
那么,monitor是如何注册监听事件的呢?答案是通过Monitor()及MonitorOOM()两个方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| // Monitor adds a process to the list of the one being monitored func (m *Monitor) Monitor(p runtime.Process) error { m.m.Lock() defer m.m.Unlock() fd := p.ExitFD() //***定义EpollEvent事件***// event := syscall.EpollEvent{ Fd: int32(fd), Events: syscall.EPOLLHUP, } //***注册事件***// if err := archutils.EpollCtl(m.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil { return err } EpollFdCounter.Inc(1) m.receivers[fd] = p return nil } // MonitorOOM adds a container to the list of the ones monitored for OOM func (m *Monitor) MonitorOOM(c runtime.Container) error { m.m.Lock() defer m.m.Unlock() o, err := c.OOM() if err != nil { return err } fd := o.FD() //***定义EpollEvent事件***// event := syscall.EpollEvent{ Fd: int32(fd), Events: syscall.EPOLLHUP | syscall.EPOLLIN, } //***注册事件***// if err := archutils.EpollCtl(m.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil { return err } EpollFdCounter.Inc(1) m.receivers[fd] = o return nil }
|
两个方法的流程如下:
- 定义EpollEvent;
- 使用EpoolCtl注册事件;
- 把监听行为添加到receivers中。
关于Epoll机制,详见http://blog.csdn.net/xiajun07061225/article/details/9250579。
总结
- containerd使用gRPC机制对外提供服务,其内部核心是一个supervisor,其主要有tasks和startTasks两个channel。supervisor会自动处理tasks channel中的内容,处理的方法就是根据task类型的不同调用不同的处理方法。其中StartTask使用start()方法处理,会把StartTask处理成startTask,并放入startTasks channel中。
- worker会消费startTasks channel中的内容,先通过containerd-shim使用”runc create”创建容器,再使用”runc start”启动容器。
- containerd中的monitor会对运行的容器进行监控,并作出相应的处理。