Docker v1.12.3版本中有两个二进制文件,其中docker为客户端,dockerd为服务端,docker一般通过Unix socket访问本地dockerd,也可以通过网络访问其他主机上的dockerd(前提是其他主机上的dockerd对网络端口进行了监听)。本次分析就介绍dockerd如何启动server监听的。

dockerd

先来看dockerd的启动流程。dockerd的入口在/cmd/dockerd/docker.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
var (
daemonCli = NewDaemonCli()
......
)
func main() {
//***调用reexec.Init()***//
if reexec.Init() {
return
}
......
if !stop {
err = daemonCli.start()
notifyShutdown(err)
if err != nil {
logrus.Fatal(err)
}
}
}

在dockerd中,先调用NewDaemonCli()生成daemonCli,然后调用daemonCli.start()启动daemonCli。所以,接下来我们来看DaemonCli。

DaemonCli

DaemonCli定义在/cmd/dockerd/daemon.go中:

1
2
3
4
5
6
7
8
9
// DaemonCli represents the daemon CLI.
type DaemonCli struct {
*daemon.Config
commonFlags *cliflags.CommonFlags
configFile *string
api *apiserver.Server
d *daemon.Daemon
}

DaemonCli中有api。
现在来看启动函数start():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//***DaemonCli的启动方法***//
func (cli *DaemonCli) start() (err error) {
......
//***调用apiserver.New()生成server***//
api := apiserver.New(serverConfig)
cli.api = api
//***(dockerd -H 0.0.0.0:4243 -H unix:///var/run/docker.sock &>> /var/log/docker.log &)***//
//***依据-H参数生成TCPSocker listener及unix listener等,并把listener加入到api中***//
for i := 0; i < len(cli.Config.Hosts); i++ {
......
//***初如化listener***//
ls, err := listeners.Init(proto, addr, serverConfig.SocketGroup, serverConfig.TLSConfig)
......
//***把listener加入到api中***//
api.Accept(protoAddrParts[1], ls...)
}
......
cli.initMiddlewares(api, serverConfig)
initRouter(api, d, c)
......
// The serve API routine never exits unless an error occurs
// We need to start it as a goroutine and wait on it so
// daemon doesn't exit
serveAPIWait := make(chan error)
//***启动api***//
go api.Wait(serveAPIWait)
......
return nil
}

start()方法中server相关的流程如下:

  1. apiserver.New()生成api,api的类型为Server;
  2. 根据参数生成不同的listener;
  3. 调用api.Accept()把listener封装成HTTPServer,然后加入到api中;
  4. 调用initRouter()安装路由;
  5. 调用api.Wait()启动api中所有的httpServer。

这里再介绍下initRouter():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//***初始化路由***//
func initRouter(s *apiserver.Server, d *daemon.Daemon, c *cluster.Cluster) {
decoder := runconfig.ContainerDecoder{}
//***初始化各模块router***//
routers := []router.Router{
container.NewRouter(d, decoder),
image.NewRouter(d, decoder),
systemrouter.NewRouter(d, c),
volume.NewRouter(d),
build.NewRouter(dockerfile.NewBuildManager(d)),
swarmrouter.NewRouter(c),
}
if d.NetworkControllerEnabled() {
routers = append(routers, network.NewRouter(d, c))
}
routers = addExperimentalRouters(routers)
//***把各模块router放到server***//
s.InitRouter(utils.IsDebugEnabled(), routers...)
}

initRouter()会初始化container, image, systemrouter, volume, build, swarmrouter这些路由,并调用Server的InitRouter()安装路由。

所以,接下来我们来看HTTPServer和Server。

HTTPServer

HTTPServer定义在/api/server/server.go中:

1
2
3
4
5
6
//***一个http.Server和一个net.Listener可以组成后台服务器***//
//***http.Server收到net.Listener中的数据,然后交给handler去处理,这个handler可能是个mux***//
type HTTPServer struct {
srv *http.Server
l net.Listener
}

HTTPServer有Serve()和Close()方法:

1
2
3
4
5
6
7
8
9
// Serve starts listening for inbound requests.
func (s *HTTPServer) Serve() error {
return s.srv.Serve(s.l)
}
// Close closes the HTTPServer from listening for the inbound requests.
func (s *HTTPServer) Close() error {
return s.l.Close()
}

可以看到,Serve()通过http.Server的Serve()进行http.Server和net.Listener的关联;Close()通过net.Listener的Close()关闭监听。

再来看HTTPServer的makeHTTPHandler()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (s *Server) makeHTTPHandler(handler httputils.APIFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// Define the context that we'll pass around to share info
// like the docker-request-id.
//
// The 'context' will be used for global data that should
// apply to all requests. Data that is specific to the
// immediate function being called should still be passed
// as 'args' on the function call.
ctx := context.Background()
handlerFunc := s.handleWithGlobalMiddlewares(handler)
vars := mux.Vars(r)
if vars == nil {
vars = make(map[string]string)
}
if err := handlerFunc(ctx, w, r, vars); err != nil {
logrus.Errorf("Handler for %s %s returned error: %v", r.Method, r.URL.Path, err)
httputils.MakeErrorHandler(err)(w, r)
}
}
}

dockerd的处理函数的参数是(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string)的形式的,makeHTTPHandler()可以把处理函数转换成http.Server可以用的handler。makeHTTPHandler()返回是一个参数为(w http.ResponseWriter, r *http.Request)的handler。

Server

Server定义在/api/server/server.go中:

1
2
3
4
5
6
7
8
// Server contains instance details for the server
type Server struct {
cfg *Config
servers []*HTTPServer
routers []router.Router
routerSwapper *routerSwapper
middlewares []middleware.Middleware
}

可以看到,Server 包含多个HTTPServer及Router。

先来看Server的Accept()方法:

1
2
3
4
5
6
7
8
9
10
11
12
//***把listener封装成httpServer,然后加入到servers***//
func (s *Server) Accept(addr string, listeners ...net.Listener) {
for _, listener := range listeners {
httpServer := &HTTPServer{
srv: &http.Server{
Addr: addr,
},
l: listener,
}
s.servers = append(s.servers, httpServer)
}
}

Accept()方法就是把HTTPServer加入到Server的servers字段中。

再来持Server的InitRouter():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// InitRouter initializes the list of routers for the server.
// This method also enables the Go profiler if enableProfiler is true.
func (s *Server) InitRouter(enableProfiler bool, routers ...router.Router) {
for _, r := range routers {
s.routers = append(s.routers, r)
}
m := s.createMux()
if enableProfiler {
profilerSetup(m)
}
s.routerSwapper = &routerSwapper{
router: m,
}
}
// createMux initializes the main router the server uses.
// For example:
//
// r := mux.NewRouter()
// r.Path("/products/").Handler(ProductsHandler)
// r.Path("/products/{key}").Handler(ProductsHandler)
// r.Path("/articles/{category}/{id:[0-9]+}").
// Handler(ArticleHandler)
func (s *Server) createMux() *mux.Router {
//***固定用法***//
m := mux.NewRouter()
logrus.Debug("Registering routers")
for _, apiRouter := range s.routers {
for _, r := range apiRouter.Routes() {
f := s.makeHTTPHandler(r.Handler())
logrus.Debugf("Registering %s, %s", r.Method(), r.Path())
//***关联路径和处理函数***//
//***第三方包固定用法***//
m.Path(versionMatcher + r.Path()).Methods(r.Method()).Handler(f)
m.Path(r.Path()).Methods(r.Method()).Handler(f)
}
}
err := errors.NewRequestNotFoundError(fmt.Errorf("page not found"))
notFoundHandler := httputils.MakeErrorHandler(err)
m.HandleFunc(versionMatcher+"/{path:.*}", notFoundHandler)
m.NotFoundHandler = notFoundHandler
return m
}

InitRouter()就是根据路由信息初始化路由器。使用”github.com/gorilla/mux”包的mux.NewRouter()生成路由器,再由m.Path(r.Path()).Methods(r.Method()).Handler(f)完成路由的安装。其中的f为经过makeHTTPHandler()转换过的handler。所以,路由信息中有path, method, handler等信息,这些,稍后介绍。

现在,我们已经完成了路由的安装,那就该启动Server中的HTTPServer了,启动过程由Wait()方法完成:

1
2
3
4
5
6
7
8
9
//***调用serveAPI(),开始监听***//
func (s *Server) Wait(waitChan chan error) {
if err := s.serveAPI(); err != nil {
logrus.Errorf("ServeAPI error: %v", err)
waitChan <- err
return
}
waitChan <- nil
}

Wait()调用serveAPI():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//***启动servers***//
func (s *Server) serveAPI() error {
var chErrors = make(chan error, len(s.servers))
//***一个server表示监听一个地址***//
for _, srv := range s.servers {
//***设置service的handler***//
srv.srv.Handler = s.routerSwapper
//***使用goroutine启动server***//
go func(srv *HTTPServer) {
var err error
logrus.Infof("API listen on %s", srv.l.Addr())
if err = srv.Serve(); err != nil && strings.Contains(err.Error(), "use of closed network connection") {
err = nil
}
chErrors <- err
}(srv)
}
//***如果接收到servers个nil,则说明全部server启动成功***//
for i := 0; i < len(s.servers); i++ {
err := <-chErrors
if err != nil {
return err
}
}
return nil
}

serveAPI()会以routine的方式启动所有的HTTPServer。这里有个小技巧,每启动一个HTTPServer,如果成功,就放一个nil到channel中,这样,在主流程中,只要在channel中读到servers个nil,那就说明全部的HTTPServer都启动成功了。当然,我觉得使用sync.WaitGroup更加直观。

localRoute

接着来看route。我们用的路由为localRoute,定义在/api/server/local.go中:

1
2
3
4
5
type localRoute struct {
method string
path string
handler httputils.APIFunc
}

可以看到localRoute中有method, path, handler,即方法,路径,处理函数。为了方便,该文件下还定义了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// NewGetRoute initializes a new route with the http method GET.
func NewGetRoute(path string, handler httputils.APIFunc) Route {
return NewRoute("GET", path, handler)
}
// NewPostRoute initializes a new route with the http method POST.
func NewPostRoute(path string, handler httputils.APIFunc) Route {
return NewRoute("POST", path, handler)
}
// NewPutRoute initializes a new route with the http method PUT.
func NewPutRoute(path string, handler httputils.APIFunc) Route {
return NewRoute("PUT", path, handler)
}
// NewDeleteRoute initializes a new route with the http method DELETE.
func NewDeleteRoute(path string, handler httputils.APIFunc) Route {
return NewRoute("DELETE", path, handler)
}

关于localRoute的生成,以container为例,定义在/api/server/router/container/container.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// NewRouter initializes a new container router
func NewRouter(b Backend, decoder httputils.ContainerDecoder) router.Router {
r := &containerRouter{
backend: b,
decoder: decoder,
}
r.initRoutes()
return r
}
// initRoutes initializes the routes in container router
func (r *containerRouter) initRoutes() {
r.routes = []router.Route{
// HEAD
router.NewHeadRoute("/containers/{name:.*}/archive", r.headContainersArchive),
// GET
router.NewGetRoute("/containers/json", r.getContainersJSON),
router.NewGetRoute("/containers/{name:.*}/export", r.getContainersExport),
router.NewGetRoute("/containers/{name:.*}/changes", r.getContainersChanges),
router.NewGetRoute("/containers/{name:.*}/json", r.getContainersByName),
router.NewGetRoute("/containers/{name:.*}/top", r.getContainersTop),
router.Cancellable(router.NewGetRoute("/containers/{name:.*}/logs", r.getContainersLogs)),
router.Cancellable(router.NewGetRoute("/containers/{name:.*}/stats", r.getContainersStats)),
router.NewGetRoute("/containers/{name:.*}/attach/ws", r.wsContainersAttach),
router.NewGetRoute("/exec/{id:.*}/json", r.getExecByID),
router.NewGetRoute("/containers/{name:.*}/archive", r.getContainersArchive),
// POST
router.NewPostRoute("/containers/create", r.postContainersCreate),
router.NewPostRoute("/containers/{name:.*}/kill", r.postContainersKill),
router.NewPostRoute("/containers/{name:.*}/pause", r.postContainersPause),
router.NewPostRoute("/containers/{name:.*}/unpause", r.postContainersUnpause),
router.NewPostRoute("/containers/{name:.*}/restart", r.postContainersRestart),
router.NewPostRoute("/containers/{name:.*}/start", r.postContainersStart),
router.NewPostRoute("/containers/{name:.*}/stop", r.postContainersStop),
router.NewPostRoute("/containers/{name:.*}/wait", r.postContainersWait),
router.NewPostRoute("/containers/{name:.*}/resize", r.postContainersResize),
router.NewPostRoute("/containers/{name:.*}/attach", r.postContainersAttach),
router.NewPostRoute("/containers/{name:.*}/copy", r.postContainersCopy), // Deprecated since 1.8, Errors out since 1.12
router.NewPostRoute("/containers/{name:.*}/exec", r.postContainerExecCreate),
router.NewPostRoute("/exec/{name:.*}/start", r.postContainerExecStart),
router.NewPostRoute("/exec/{name:.*}/resize", r.postContainerExecResize),
router.NewPostRoute("/containers/{name:.*}/rename", r.postContainerRename),
router.NewPostRoute("/containers/{name:.*}/update", r.postContainerUpdate),
// PUT
router.NewPutRoute("/containers/{name:.*}/archive", r.putContainersArchive),
// DELETE
router.NewDeleteRoute("/containers/{name:.*}", r.deleteContainers),
}
}

这时,就和Server中的initRouter()方法联系起来了。

现在,我们已经介绍了路由的初始化,加上之前的Server的启动,已经基本上完成了dockerd监听流程的分析,但是HTTPServer监听的listener的生成还未介绍。

listener

根据DaemonCli的start(),listener由”ls, err := listeners.Init(proto, addr, serverConfig.SocketGroup, serverConfig.TLSConfig)”生成。
所以,来看/pkg/listeners/listeners_unix.go中的Init():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func Init(proto, addr, socketGroup string, tlsConfig *tls.Config) ([]net.Listener, error) {
ls := []net.Listener{}
switch proto {
case "fd":
fds, err := listenFD(addr, tlsConfig)
if err != nil {
return nil, err
}
ls = append(ls, fds...)
case "tcp":
//***生成TCPSocket***//
l, err := sockets.NewTCPSocket(addr, tlsConfig)
if err != nil {
return nil, err
}
ls = append(ls, l)
case "unix":
l, err := sockets.NewUnixSocket(addr, socketGroup)
if err != nil {
return nil, fmt.Errorf("can't create unix socket %s: %v", addr, err)
}
ls = append(ls, l)
default:
return nil, fmt.Errorf("invalid protocol format: %q", proto)
}
return ls, nil
}

以unix为例,sockets.NewUnixSocket()定义在/vendor/src/github.com/docker/go-connections/sockets/unix_socket.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// NewUnixSocket creates a unix socket with the specified path and group.
func NewUnixSocket(path, group string) (net.Listener, error) {
if err := syscall.Unlink(path); err != nil && !os.IsNotExist(err) {
return nil, err
}
mask := syscall.Umask(0777)
defer syscall.Umask(mask)
l, err := net.Listen("unix", path)
if err != nil {
return nil, err
}
if err := setSocketGroup(path, group); err != nil {
l.Close()
return nil, err
}
if err := os.Chmod(path, 0660); err != nil {
l.Close()
return nil, err
}
return l, nil
}

可以看到unix socket最终调用的是net.Listen()。

总结

一般来说,server的编写由收下几步组成:

  1. 生成listener;
  2. 完成路由的创建及初始化,或直接生成handler;
  3. 生成server,并关联路由或handler;
  4. server开始监听listener;