Docker的copy命令可以在容器与物理机之间拷贝内容。本次分析将介绍copy命令是如何实现的。

client端

在Docker client端,copy命令由runCopy()执行。runCopy()定义在/api/client/container/cp.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func runCopy(dockerCli *client.DockerCli, opts copyOptions) error {
srcContainer, srcPath := splitCpArg(opts.source)
dstContainer, dstPath := splitCpArg(opts.destination)
var direction copyDirection
if srcContainer != "" {
direction |= fromContainer
}
if dstContainer != "" {
direction |= toContainer
}
cpParam := &cpConfig{
followLink: opts.followLink,
}
ctx := context.Background()
switch direction {
case fromContainer:
return copyFromContainer(ctx, dockerCli, srcContainer, srcPath, dstPath, cpParam)
case toContainer:
return copyToContainer(ctx, dockerCli, srcPath, dstContainer, dstPath, cpParam)
case acrossContainers:
// Copying between containers isn't supported.
return fmt.Errorf("copying between containers is not supported")
default:
// User didn't specify any container.
return fmt.Errorf("must specify at least one container source")
}
}

runCopy()会根据copy的参数,如container出现在src中,则调用copyFromContainer();如container出现在dest中,则调用copyToContainer()。

copyFromContainer()可以把内容从容器中拷贝到物理机,定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//***从container中复制***//
func copyFromContainer(ctx context.Context, dockerCli *client.DockerCli, srcContainer, srcPath, dstPath string, cpParam *cpConfig) (err error) {
......
//***从container中拷贝内容***//
content, stat, err := dockerCli.Client().CopyFromContainer(ctx, srcContainer, srcPath)
if err != nil {
return err
}
defer content.Close()
......
// Prepare source copy info.
srcInfo := archive.CopyInfo{
Path: srcPath,
Exists: true,
IsDir: stat.Mode.IsDir(),
RebaseName: rebaseName,
}
preArchive := content
if len(srcInfo.RebaseName) != 0 {
_, srcBase := archive.SplitPathDirEntry(srcInfo.Path)
preArchive = archive.RebaseArchiveEntries(content, srcBase, srcInfo.RebaseName)
}
// See comments in the implementation of `archive.CopyTo` for exactly what
// goes into deciding how and whether the source archive needs to be
// altered for the correct copy behavior.
//***把内容拷贝到dstPath***//
return archive.CopyTo(preArchive, srcInfo, dstPath)
}

copyFromContainer()先调用Client的CopyFromContainer()获取打包好的content;然后调用archive包的CopyTo()把content的内容解包到dstPath。所以这里出现了archive包的CopyTo()函数,关于archive包,将在下一次分析中介绍。

copyToContainer()可以把物理机上的内容拷贝到容器中,定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func copyToContainer(ctx context.Context, dockerCli *client.DockerCli, srcPath, dstContainer, dstPath string, cpParam *cpConfig) (err error) {
......
if srcPath == "-" {
......
} else {
// Prepare source copy info.
srcInfo, err := archive.CopyInfoSourcePath(srcPath, cpParam.followLink)
if err != nil {
return err
}
//***调用TarResource()***//
srcArchive, err := archive.TarResource(srcInfo)
if err != nil {
return err
}
defer srcArchive.Close()
......
dstDir, preparedArchive, err := archive.PrepareArchiveCopy(srcArchive, srcInfo, dstInfo)
if err != nil {
return err
}
defer preparedArchive.Close()
resolvedDstPath = dstDir
content = preparedArchive
}
......
return dockerCli.Client().CopyToContainer(ctx, dstContainer, resolvedDstPath, content, options)
}

copyToContainer()会调用aichive包的TarResource()来把文件源进行打包,然后调用client的CopyToContainer()把打包数据流传入拷贝到容器中。这里出现了archive包的TarResource()函数。

engine-api端

engine-api的client中定义有CopyFromContainer()和CopyToContainer(),都定义在/docker/engine-api/client/container_copy.go中:
CopyFromContainer()定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// CopyFromContainer gets the content from the container and returns it as a Reader
// to manipulate it in the host. It's up to the caller to close the reader.
func (cli *Client) CopyFromContainer(ctx context.Context, container, srcPath string) (io.ReadCloser, types.ContainerPathStat, error) {
query := make(url.Values, 1)
query.Set("path", filepath.ToSlash(srcPath)) // Normalize the paths used in the API.
apiPath := fmt.Sprintf("/containers/%s/archive", container)
response, err := cli.get(ctx, apiPath, query, nil)
if err != nil {
return nil, types.ContainerPathStat{}, err
}
if response.statusCode != http.StatusOK {
return nil, types.ContainerPathStat{}, fmt.Errorf("unexpected status code from daemon: %d", response.statusCode)
}
......
stat, err := getContainerPathStatFromHeader(response.header)
if err != nil {
return nil, stat, fmt.Errorf("unable to get resource stat from response: %s", err)
}
return response.body, stat, err
}

CopyFromContainer()使用”GET”去请求dockerd的”/containers/container-name/archive”路径。

CopyToContainer()定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//***拷贝到容器中***//
func (cli *Client) CopyToContainer(ctx context.Context, container, path string, content io.Reader, options types.CopyToContainerOptions) error {
query := url.Values{}
query.Set("path", filepath.ToSlash(path)) // Normalize the paths used in the API.
// Do not allow for an existing directory to be overwritten by a non-directory and vice versa.
if !options.AllowOverwriteDirWithFile {
query.Set("noOverwriteDirNonDir", "true")
}
apiPath := fmt.Sprintf("/containers/%s/archive", container)
response, err := cli.putRaw(ctx, apiPath, query, content, nil)
if err != nil {
return err
}
defer ensureReaderClosed(response)
if response.statusCode != http.StatusOK {
return fmt.Errorf("unexpected status code from daemon: %d", response.statusCode)
}
return nil
}

CopyToContainer把打包数据流以”PUT”方法发送到dockerd的”/containers/container-name/archive”路径。

dockerd侧

在dockerd,”/containers//archive”路径的”GET”请求的功能由getContainersArchive()方法实现;”/containers//archive”路径的”PUT”请求的功能由putContainersArchive()方法实现。

getContainersArchive()

getContainersArchive()定义在/api/server/router/container/copy.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (s *containerRouter) getContainersArchive(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
v, err := httputils.ArchiveFormValues(r, vars)
if err != nil {
return err
}
//***调用Daemon的ContainerArchivePath()***//
tarArchive, stat, err := s.backend.ContainerArchivePath(v.Name, v.Path)
if err != nil {
return err
}
defer tarArchive.Close()
if err := setContainerPathStatHeader(stat, w.Header()); err != nil {
return err
}
w.Header().Set("Content-Type", "application/x-tar")
//***把tarArchive的内容拷贝到http.ResponseWriter***//
_, err = io.Copy(w, tarArchive)
return err
}

getContainersArchive()会从参数中解析出需要拷贝容器的文件(或目录),然后调用dockerd的ContainerArchivePath()方法把文件(或目录)打包成数据流,然后把数据流的数据通过io.Copy()写入到ResponseWriter中,即应答的body中。

所以关键的实现是ContainerArchivePath()是如何打包文件(或目录)的。ContainerArchivePath()定义在/daemon/archive.go中:

1
2
3
4
5
6
7
8
func (daemon *Daemon) ContainerArchivePath(name string, path string) (content io.ReadCloser, stat *types.ContainerPathStat, err error) {
container, err := daemon.GetContainer(name)
if err != nil {
return nil, nil, err
}
return daemon.containerArchivePath(container, path)
}

ContainerArchivePath()先获取container,然后调用containerArchivePath()来打包容器中的内容。
containerArchivePath()定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// containerArchivePath creates an archive of the filesystem resource at the specified
// path in this container. Returns a tar archive of the resource and stat info
// about the resource.
func (daemon *Daemon) containerArchivePath(container *container.Container, path string) (content io.ReadCloser, stat *types.ContainerPathStat, err error) {
container.Lock()
defer func() {
if err != nil {
// Wait to unlock the container until the archive is fully read
// (see the ReadCloseWrapper func below) or if there is an error
// before that occurs.
container.Unlock()
}
}()
if err = daemon.Mount(container); err != nil {
return nil, nil, err
}
defer func() {
if err != nil {
// unmount any volumes
container.UnmountVolumes(true, daemon.LogVolumeEvent)
// unmount the container's rootfs
daemon.Unmount(container)
}
}()
if err = daemon.mountVolumes(container); err != nil {
return nil, nil, err
}
//***resolvedPath: /var/lib/docker1/aufs/mnt/74c46809b458407e104da88a470e71924a49ddb7e5e259f24fc0e8ef48571a17/init_ssh.sh***//
//***absPath: /init_ssh.sh***//
resolvedPath, absPath, err := container.ResolvePath(path)
if err != nil {
return nil, nil, err
}
......
//***调用TarResourceRebase()***//
data, err := archive.TarResourceRebase(resolvedPath, filepath.Base(absPath))
if err != nil {
return nil, nil, err
}
content = ioutils.NewReadCloserWrapper(data, func() error {
err := data.Close()
container.UnmountVolumes(true, daemon.LogVolumeEvent)
daemon.Unmount(container)
container.Unlock()
return err
})
daemon.LogContainerEvent(container, "archive-path")
return content, stat, nil
}

containerArchivePath()先解析出resolvedPath和absPath。然后调用archive.TarResourceRebase()来对文件(或目录)进行打包。然后把数据流返回。这里使用了archive包中的TarResourceRebase()。

putContainersArchive()

putContainersArchive()定义在/api/server/router/container/copy.go中:

1
2
3
4
5
6
7
8
9
10
11
12
//***把内容拷贝到容器中***//
func (s *containerRouter) putContainersArchive(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
v, err := httputils.ArchiveFormValues(r, vars)
if err != nil {
return err
}
noOverwriteDirNonDir := httputils.BoolValue(r, "noOverwriteDirNonDir")
//***调用Daemon的ContainerExtractToDir()***//
return s.backend.ContainerExtractToDir(v.Name, v.Path, noOverwriteDirNonDir, r.Body)
}

putContainersArchive()主要调用了dockerd的ContainerExtractToDir()方法。

ContainerExtractToDir()定义在/daemon/archive.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (daemon *Daemon) containerExtractToDir(container *container.Container, path string, noOverwriteDirNonDir bool, content io.Reader) (err error) {
......
absPath := archive.PreserveTrailingDotOrSeparator(filepath.Join(string(filepath.Separator), path), path)
// This will evaluate the last path element if it is a symlink.
//***resolvedPath: /var/lib/docker1/aufs/mnt/74c46809b458407e104da88a470e71924a49ddb7e5e259f24fc0e8ef48571a17/home***//
resolvedPath, err := container.GetResourcePath(absPath)
if err != nil {
return err
}
......
uid, gid := daemon.GetRemappedUIDGID()
options := &archive.TarOptions{
NoOverwriteDirNonDir: noOverwriteDirNonDir,
ChownOpts: &archive.TarChownOptions{
UID: uid, GID: gid, // TODO: should all ownership be set to root (either real or remapped)?
},
}
//***Fankang***//
//***调用Untar()***//
if err := chrootarchive.Untar(content, resolvedPath, options); err != nil {
return err
}
daemon.LogContainerEvent(container, "extract-to-dir")
return nil
}

containerExtractToDir()主要调用的是chrootarchive的Untar()对打包数据流进行解包。

chrootarchive

chrootarchive的Untar()定义在/pkg/chrootarchive/archive.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func Untar(tarArchive io.Reader, dest string, options *archive.TarOptions) error {
return untarHandler(tarArchive, dest, options, true)
}
// Handler for teasing out the automatic decompression
func untarHandler(tarArchive io.Reader, dest string, options *archive.TarOptions, decompress bool) error {
if tarArchive == nil {
return fmt.Errorf("Empty archive")
}
if options == nil {
options = &archive.TarOptions{}
}
if options.ExcludePatterns == nil {
options.ExcludePatterns = []string{}
}
rootUID, rootGID, err := idtools.GetRootUIDGID(options.UIDMaps, options.GIDMaps)
if err != nil {
return err
}
dest = filepath.Clean(dest)
if _, err := os.Stat(dest); os.IsNotExist(err) {
if err := idtools.MkdirAllNewAs(dest, 0755, rootUID, rootGID); err != nil {
return err
}
}
r := ioutil.NopCloser(tarArchive)
if decompress {
decompressedArchive, err := archive.DecompressStream(tarArchive)
if err != nil {
return err
}
defer decompressedArchive.Close()
r = decompressedArchive
}
return invokeUnpack(r, dest, options)
}

Untar()调用了untarHandler(),而untarHandler()主要调用了invokeUnpack()。

invokeUnpack()定义在/pkg/chrootarchive/archive_unix.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func invokeUnpack(decompressedArchive io.Reader, dest string, options *archive.TarOptions) error {
// We can't pass a potentially large exclude list directly via cmd line
// because we easily overrun the kernel's max argument/environment size
// when the full image list is passed (e.g. when this is used by
// `docker load`). We will marshall the options via a pipe to the
// child
r, w, err := os.Pipe()
if err != nil {
return fmt.Errorf("Untar pipe failure: %v", err)
}
//***调用docker-untar***//
//***dest: /var/lib/docker1/aufs/mnt/74c46809b458407e104da88a470e71924a49ddb7e5e259f24fc0e8ef48571a17/home***//
cmd := reexec.Command("docker-untar", dest)
//***把decompressedArchive作为输入***//
cmd.Stdin = decompressedArchive
cmd.ExtraFiles = append(cmd.ExtraFiles, r)
output := bytes.NewBuffer(nil)
cmd.Stdout = output
cmd.Stderr = output
if err := cmd.Start(); err != nil {
return fmt.Errorf("Untar error on re-exec cmd: %v", err)
}
//write the options to the pipe for the untar exec to read
if err := json.NewEncoder(w).Encode(options); err != nil {
return fmt.Errorf("Untar json encode to pipe failed: %v", err)
}
w.Close()
if err := cmd.Wait(); err != nil {
// when `xz -d -c -q | docker-untar ...` failed on docker-untar side,
// we need to exhaust `xz`'s output, otherwise the `xz` side will be
// pending on write pipe forever
io.Copy(ioutil.Discard, decompressedArchive)
return fmt.Errorf("Error processing tar file(%v): %s", err, output)
}
return nil
}

这里很有意思,invokeUnpack()调用的是docker-untar(todo: 为什么要通过reexec机制来解包,而不是直接解包,还有待研究),打包数据流以stdin的方式传入。该实现使用Docker的reexec机制,所以必有注册的地方,来看/pkg/chrootarchive/init_unix.go:

1
2
3
4
func init() {
reexec.Register("docker-applyLayer", applyLayer)
reexec.Register("docker-untar", untar)
}

所以docker的”docker-untar”由untar()函数完成功能执行,untar定义在/pkg/chrootarchive/archive_unix.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func untar() {
runtime.LockOSThread()
flag.Parse()
var options *archive.TarOptions
//read the options from the pipe "ExtraFiles"
if err := json.NewDecoder(os.NewFile(3, "options")).Decode(&options); err != nil {
fatal(err)
}
if err := chroot(flag.Arg(0)); err != nil {
fatal(err)
}
//***从stdin中获取***//
if err := archive.Unpack(os.Stdin, "/", options); err != nil {
fatal(err)
}
// fully consume stdin in case it is zero padded
if _, err := flush(os.Stdin); err != nil {
fatal(err)
}
os.Exit(0)
}

untar()会调用archive包的Unpack()实现从Stdin中解包数据。所以这里出现了archive包的Unpack()。Unpack()会把数据流解包到容器指定目录中。

总结

从容器向物理机拷贝:使用archive包的TarResourceRebase()进行容器中文件(或目录)的打包操作;使用archive包的CopyTo()函数完成数据流解包操作;
从物理机向容器拷贝:使用archive包的TarResource()对物理机上文件(或目录)进行打包;使用archive包的Unpack()把数据流解包到容器指定目录中。

下一次分析将介绍Docker的archive包。