本次分析介绍Docker的archive工具包。之前说到”docker cp”命令用到了TarResourceRebase(),CopyTo(),TarResource()和Unpack()
。我们先来看压缩的代码。
压缩
TarResource()
TarResource()定义在/pkg/archive/copy.go中:
1 2 3
| func TarResource(sourceInfo CopyInfo) (content Archive, err error) { return TarResourceRebase(sourceInfo.Path, sourceInfo.RebaseName) }
|
可以看出,TarResource()调用的是TarResourceRebase()。
TarResourceRebase()
TarResourceRebase()也定义在/pkg/archive/copy.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| func TarResourceRebase(sourcePath, rebaseName string) (content Archive, err error) { sourcePath = normalizePath(sourcePath) if _, err = os.Lstat(sourcePath); err != nil { return } sourceDir, sourceBase := SplitPathDirEntry(sourcePath) filter := []string{sourceBase} logrus.Debugf("copying %q from %q", sourceBase, sourceDir) return TarWithOptions(sourceDir, &TarOptions{ Compression: Uncompressed, IncludeFiles: filter, IncludeSourceDir: true, RebaseNames: map[string]string{ sourceBase: rebaseName, }, }) }
|
可以看到,TarResourceRebase()调用的是TarWithOptions()。
Tar()
在介绍TarWithOptions()之前,先来介绍Tar(),定义在/pkg/archive/archive.go中:
1 2 3 4 5
| func Tar(path string, compression Compression) (io.ReadCloser, error) { return TarWithOptions(path, &TarOptions{Compression: compression}) }
|
可以看到,Tar()调用的也是TarWithOptions()。
TarWithOptions()
TarWithOptions()定义在/pkg/archive/archive.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| func TarWithOptions(srcPath string, options *TarOptions) (io.ReadCloser, error) { srcPath = fixVolumePathPrefix(srcPath) patterns, patDirs, exceptions, err := fileutils.CleanPatterns(options.ExcludePatterns) if err != nil { return nil, err } pipeReader, pipeWriter := io.Pipe() compressWriter, err := CompressStream(pipeWriter, options.Compression) if err != nil { return nil, err } go func() { ta := &tarAppender{ TarWriter: tar.NewWriter(compressWriter), Buffer: pools.BufioWriter32KPool.Get(nil), SeenFiles: make(map[uint64]string), UIDMaps: options.UIDMaps, GIDMaps: options.GIDMaps, WhiteoutConverter: getWhiteoutConverter(options.WhiteoutFormat), } defer func() { if err := ta.TarWriter.Close(); err != nil { logrus.Errorf("Can't close tar writer: %s", err) } if err := compressWriter.Close(); err != nil { logrus.Errorf("Can't close compress writer: %s", err) } if err := pipeWriter.Close(); err != nil { logrus.Errorf("Can't close pipe writer: %s", err) } }() ...... for _, include := range options.IncludeFiles { rebaseName := options.RebaseNames[include] walkRoot := getWalkRoot(srcPath, include) filepath.Walk(walkRoot, func(filePath string, f os.FileInfo, err error) error { ...... if err := ta.addTarFile(filePath, relFilePath); err != nil { logrus.Errorf("Can't add file %s to tar: %s", filePath, err) if err == io.ErrClosedPipe { return err } } return nil }) } }() return pipeReader, nil }
|
TarWithOptions()的流程如下:
- 调用io.Pipe()生成pipeReader, pipeWriter;
- 调用CompressStream()封装pipeWriter成compressWriter;
- 生成tarAppender;
- 遍历文件,调用tarAppender的addTarFile()把文件写入数据流;
- 返回pipeReader。
先来看CompressStream():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| func CompressStream(dest io.Writer, compression Compression) (io.WriteCloser, error) { p := pools.BufioWriter32KPool buf := p.Get(dest) switch compression { case Uncompressed: writeBufWrapper := p.NewWriteCloserWrapper(buf, buf) return writeBufWrapper, nil case Gzip: gzWriter := gzip.NewWriter(dest) writeBufWrapper := p.NewWriteCloserWrapper(buf, gzWriter) return writeBufWrapper, nil case Bzip2, Xz: return nil, fmt.Errorf("Unsupported compression format %s", (&compression).Extension()) default: return nil, fmt.Errorf("Unsupported compression format %s", (&compression).Extension()) } }
|
DecompressStream()的流程如下:
- 生成sync.Pool的封装BufioReader32KPool(加速垃圾回收),并从BufioReader32KPool获取一个buf;
- 依据压缩类型封装成不同的writerCloser,NewWriteCloserWrapper()允许自己定义Close()方法;
- 目前只支持Uncompressed和Gzip两种压缩形式。
再来看tarAppender的addTarFile():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| func (ta *tarAppender) addTarFile(path, name string) error { fi, err := os.Lstat(path) if err != nil { return err } link := "" if fi.Mode()&os.ModeSymlink != 0 { if link, err = os.Readlink(path); err != nil { return err } } hdr, err := tar.FileInfoHeader(fi, link) ...... if err := ta.TarWriter.WriteHeader(hdr); err != nil { return err } if hdr.Typeflag == tar.TypeReg && hdr.Size > 0 { file, err := os.Open(path) if err != nil { return err } ta.Buffer.Reset(ta.TarWriter) defer ta.Buffer.Reset(nil) _, err = io.Copy(ta.Buffer, file) file.Close() if err != nil { return err } err = ta.Buffer.Flush() if err != nil { return err } } return nil }
|
addTarFile()的流程如下:
- 写入头文件信息;
- 读取文件内容;
- 把文件内容写入到TarWriter中。
现在写入的数据会自动经压缩包打包成数据流了。
解压
CopyTo()
先来看CopyTo(),定义在/pkg/archive/copy.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| func CopyTo(content Reader, srcInfo CopyInfo, dstPath string) error { dstInfo, err := CopyInfoDestinationPath(normalizePath(dstPath)) if err != nil { return err } dstDir, copyArchive, err := PrepareArchiveCopy(content, srcInfo, dstInfo) if err != nil { return err } defer copyArchive.Close() options := &TarOptions{ NoLchown: true, NoOverwriteDirNonDir: true, } return Untar(copyArchive, dstDir, options) }
|
可以看到,CopyTo()调用了Untar()。
Untar()
Untar()定义在/pkg/archive/archive.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| func Untar(tarArchive io.Reader, dest string, options *TarOptions) error { return untarHandler(tarArchive, dest, options, true) } func UntarUncompressed(tarArchive io.Reader, dest string, options *TarOptions) error { return untarHandler(tarArchive, dest, options, false) } func untarHandler(tarArchive io.Reader, dest string, options *TarOptions, decompress bool) error { if tarArchive == nil { return fmt.Errorf("Empty archive") } dest = filepath.Clean(dest) if options == nil { options = &TarOptions{} } if options.ExcludePatterns == nil { options.ExcludePatterns = []string{} } r := tarArchive if decompress { decompressedArchive, err := DecompressStream(tarArchive) if err != nil { return err } defer decompressedArchive.Close() r = decompressedArchive } return Unpack(r, dest, options) }
|
这里Untar()和UntarUncompressed()都会调用untarHandler()。
untarHandler()是如果需要解压,那么调用DecompressStream(),然后调用Unpack()。
DecompressStream()
DecompressStream()定义在/pkg/archive/archive.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| func DecompressStream(archive io.Reader) (io.ReadCloser, error) { p := pools.BufioReader32KPool buf := p.Get(archive) bs, err := buf.Peek(10) if err != nil && err != io.EOF { return nil, err } compression := DetectCompression(bs) switch compression { case Uncompressed: readBufWrapper := p.NewReadCloserWrapper(buf, buf) return readBufWrapper, nil case Gzip: gzReader, err := gzip.NewReader(buf) if err != nil { return nil, err } readBufWrapper := p.NewReadCloserWrapper(buf, gzReader) return readBufWrapper, nil case Bzip2: bz2Reader := bzip2.NewReader(buf) readBufWrapper := p.NewReadCloserWrapper(buf, bz2Reader) return readBufWrapper, nil case Xz: xzReader, chdone, err := xzDecompress(buf) if err != nil { return nil, err } readBufWrapper := p.NewReadCloserWrapper(buf, xzReader) return ioutils.NewReadCloserWrapper(readBufWrapper, func() error { <-chdone return readBufWrapper.Close() }), nil default: return nil, fmt.Errorf("Unsupported compression format %s", (&compression).Extension()) } }
|
DecompressStream()会在Reader的基础上加一层解压操作。
Unpack()
Unpack()定义在/pkg/archive/archive.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| func Unpack(decompressedArchive io.Reader, dest string, options *TarOptions) error { tr := tar.NewReader(decompressedArchive) trBuf := pools.BufioReader32KPool.Get(nil) defer pools.BufioReader32KPool.Put(trBuf) ...... loop: for { hdr, err := tr.Next() ...... path := filepath.Join(dest, hdr.Name) rel, err := filepath.Rel(dest, path) ...... trBuf.Reset(tr) ...... if err := createTarFile(path, dest, hdr, trBuf, !options.NoLchown, options.ChownOpts); err != nil { return err } if hdr.Typeflag == tar.TypeDir { dirs = append(dirs, hdr) } } ...... return nil }
|
Unpack()的流程如下:
- 通过tar.NewReader()把Reader封装成tarReader;
- 然后调用Next()获取数据流中的数据包;
- 调用createTarFile()创建文件。
createTarFile()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
| func createTarFile(path, extractDir string, hdr *tar.Header, reader io.Reader, Lchown bool, chownOpts *TarChownOptions) error { hdrInfo := hdr.FileInfo() switch hdr.Typeflag { case tar.TypeDir: if fi, err := os.Lstat(path); !(err == nil && fi.IsDir()) { if err := os.Mkdir(path, hdrInfo.Mode()); err != nil { return err } } case tar.TypeReg, tar.TypeRegA: file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, hdrInfo.Mode()) if err != nil { return err } if _, err := io.Copy(file, reader); err != nil { file.Close() return err } file.Close() case tar.TypeBlock, tar.TypeChar, tar.TypeFifo: if err := handleTarTypeBlockCharFifo(hdr, path); err != nil { return err } case tar.TypeLink: targetPath := filepath.Join(extractDir, hdr.Linkname) if !strings.HasPrefix(targetPath, extractDir) { return breakoutError(fmt.Errorf("invalid hardlink %q -> %q", targetPath, hdr.Linkname)) } if err := os.Link(targetPath, path); err != nil { return err } case tar.TypeSymlink: targetPath := filepath.Join(filepath.Dir(path), hdr.Linkname) if !strings.HasPrefix(targetPath, extractDir) { return breakoutError(fmt.Errorf("invalid symlink %q -> %q", path, hdr.Linkname)) } if err := os.Symlink(hdr.Linkname, path); err != nil { return err } case tar.TypeXGlobalHeader: logrus.Debug("PAX Global Extended Headers found and ignored") return nil default: return fmt.Errorf("Unhandled tar header type %d\n", hdr.Typeflag) } if Lchown && runtime.GOOS != "windows" { if chownOpts == nil { chownOpts = &TarChownOptions{UID: hdr.Uid, GID: hdr.Gid} } if err := os.Lchown(path, chownOpts.UID, chownOpts.GID); err != nil { return err } } var errors []string for key, value := range hdr.Xattrs { if err := system.Lsetxattr(path, key, []byte(value), 0); err != nil { if err == syscall.ENOTSUP { errors = append(errors, err.Error()) continue } return err } } if len(errors) > 0 { logrus.WithFields(logrus.Fields{ "errors": errors, }).Warn("ignored xattrs in archive: underlying filesystem doesn't support them") } if err := handleLChmod(hdr, path, hdrInfo); err != nil { return err } aTime := hdr.AccessTime if aTime.Before(hdr.ModTime) { aTime = hdr.ModTime } if hdr.Typeflag == tar.TypeLink { if fi, err := os.Lstat(hdr.Linkname); err == nil && (fi.Mode()&os.ModeSymlink == 0) { if err := system.Chtimes(path, aTime, hdr.ModTime); err != nil { return err } } } else if hdr.Typeflag != tar.TypeSymlink { if err := system.Chtimes(path, aTime, hdr.ModTime); err != nil { return err } } else { ts := []syscall.Timespec{timeToTimespec(aTime), timeToTimespec(hdr.ModTime)} if err := system.LUtimesNano(path, ts); err != nil && err != system.ErrNotSupportedPlatform { return err } } return nil }
|
createTarFile()目前还没吃透。
Demo
下面以Tar()和Untar()两个入口演示如何使用Docker的archive工具。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| package main import ( "fmt" "github.com/docker/docker/pkg/archive" ) func main() { path := "/home/fankang/docker/data" compress := archive.Gzip tarArchive, err := archive.Tar(path, compress) if err != nil { fmt.Println(err) } dest_path := "/home/fankang/docker/newdata" options := &archive.TarOptions{Compression: compress} archive.Untar(tarArchive, dest_path, options) }
|