之前在kubectl printer中已经分析过kubectl是如何以YAML格式打印内容的,本次分析将介绍kubectl是如何从YAML格式文件输入内容并做转换的。

FileVisitor

我们在使用kubectl create -f时,可以从文件创建资源,文件的格式可以为JSON格式或YAML格式。其中YAML格式支持”—“分割符,可以把多个YAML写在一个文件中。读取文件的操作通过FileVisitor进行,FileVisitor定义在/pkg/kubectl/resource/visitor.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//***读取文件的visitor***//
//***是对StreamVisitor的封装***//
type FileVisitor struct {
Path string
*StreamVisitor
}
// Visit in a FileVisitor is just taking care of opening/closing files
func (v *FileVisitor) Visit(fn VisitorFunc) error {
var f *os.File
if v.Path == constSTDINstr {
f = os.Stdin
} else {
var err error
if f, err = os.Open(v.Path); err != nil {
return err
}
}
defer f.Close()
v.StreamVisitor.Reader = f
return v.StreamVisitor.Visit(fn)
}

可以看出,FileVisitor打开文件后,调用了StreamVisitor。StreamVisitor定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
//***从io.reader中获取数据流(yaml格式),然后转换成json格式,最后进行schema检查,封装成info对象***//
type StreamVisitor struct {
io.Reader
*Mapper
Source string
Schema validation.Schema
}
// NewStreamVisitor is a helper function that is useful when we want to change the fields of the struct but keep calls the same.
func NewStreamVisitor(r io.Reader, mapper *Mapper, source string, schema validation.Schema) *StreamVisitor {
return &StreamVisitor{
Reader: r,
Mapper: mapper,
Source: source,
Schema: schema,
}
}
// Visit implements Visitor over a stream. StreamVisitor is able to distinct multiple resources in one stream.
func (v *StreamVisitor) Visit(fn VisitorFunc) error {
d := yaml.NewYAMLOrJSONDecoder(v.Reader, 4096)
for {
ext := runtime.RawExtension{}
//***StreamVisitor使用NewYAMLOrJSONDecoder(定义在/pkg/util/yaml/decoder.go中)对YAML进行解析***//
if err := d.Decode(&ext); err != nil {
if err == io.EOF {
return nil
}
return err
}
// TODO: This needs to be able to handle object in other encodings and schemas.
ext.Raw = bytes.TrimSpace(ext.Raw)
if len(ext.Raw) == 0 || bytes.Equal(ext.Raw, []byte("null")) {
continue
}
if err := ValidateSchema(ext.Raw, v.Schema); err != nil {
return fmt.Errorf("error validating %q: %v", v.Source, err)
}
//***调用InfoForData()(在mapper.go中)生成info信息***//
//***在InfoForData中会把json对象转换成对应的struct类型***//
info, err := v.InfoForData(ext.Raw, v.Source)
if err != nil {
if fnErr := fn(info, err); fnErr != nil {
return fnErr
}
continue
}
if err := fn(info, nil); err != nil {
return err
}
}
}

可以看出,StreamVisitor的Visit()函数先通过调用NewYAMLOrJSONDecoder()生成decoder,然后使用Decode()对YAML文件中每一个YAML体进行解码。

YAMLOrJSONDecoder

所以,接下来来看YAMLOrJSONDecoder,定义在/pkg/util/yaml/decoder.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// YAMLOrJSONDecoder attempts to decode a stream of JSON documents or
// YAML documents by sniffing for a leading { character.
type YAMLOrJSONDecoder struct {
r io.Reader
bufferSize int
decoder decoder
}
// NewYAMLOrJSONDecoder returns a decoder that will process YAML documents
// or JSON documents from the given reader as a stream. bufferSize determines
// how far into the stream the decoder will look to figure out whether this
// is a JSON stream (has whitespace followed by an open brace).
func NewYAMLOrJSONDecoder(r io.Reader, bufferSize int) *YAMLOrJSONDecoder {
return &YAMLOrJSONDecoder{
r: r,
bufferSize: bufferSize,
}
}
// Decode unmarshals the next object from the underlying stream into the
// provide object, or returns an error.
func (d *YAMLOrJSONDecoder) Decode(into interface{}) error {
if d.decoder == nil {
buffer, isJSON := GuessJSONStream(d.r, d.bufferSize)
if isJSON {
//***以json方式处理***//
glog.V(4).Infof("decoding stream as JSON")
d.decoder = json.NewDecoder(buffer)
} else {
//***以yaml方式处理***//
glog.V(4).Infof("decoding stream as YAML")
d.decoder = NewYAMLToJSONDecoder(buffer)
}
}
err := d.decoder.Decode(into)
if jsonDecoder, ok := d.decoder.(*json.Decoder); ok {
if syntax, ok := err.(*json.SyntaxError); ok {
data, readErr := ioutil.ReadAll(jsonDecoder.Buffered())
if readErr != nil {
glog.V(4).Infof("reading stream failed: %v", readErr)
}
js := string(data)
start := strings.LastIndex(js[:syntax.Offset], "\n") + 1
line := strings.Count(js[:start], "\n")
return fmt.Errorf("json: line %d: %s", line, syntax.Error())
}
}
return err
}

可以看出,YAMLOrJSONDecoder的Decode()方法会依据数据的格式来使用不同的解码器。其实判断是JSON格式还是YAML格式的方法很简单,以”{“开头的就认为是JSON格式,其他为YAML格式,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// GuessJSONStream scans the provided reader up to size, looking
// for an open brace indicating this is JSON. It will return the
// bufio.Reader it creates for the consumer.
func GuessJSONStream(r io.Reader, size int) (io.Reader, bool) {
buffer := bufio.NewReaderSize(r, size)
//***Fankang***//
//***Peek返回缓存的一个切片,该切片引用缓存中前n字节数据***//
b, _ := buffer.Peek(size)
return buffer, hasJSONPrefix(b)
}
var jsonPrefix = []byte("{")
// hasJSONPrefix returns true if the provided buffer appears to start with
// a JSON open brace.
func hasJSONPrefix(buf []byte) bool {
return hasPrefix(buf, jsonPrefix)
}
// Return true if the first non-whitespace bytes in buf is
// prefix.
func hasPrefix(buf []byte, prefix []byte) bool {
trim := bytes.TrimLeftFunc(buf, unicode.IsSpace)
return bytes.HasPrefix(trim, prefix)
}

如果是YAML格式,YAMLOrJSONDecoder将通过NewYAMLToJSONDecoder()生成YAMLToJSONDecoder,然后调用YAMLToJSONDecoder的decode()方法。

YAMLToJSONDecoder

YAMLToJSONDecoder用来解码YAML文件。YAMLToJSONDecoder定义在/pkg/util/yaml/decoder.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// YAMLToJSONDecoder decodes YAML documents from an io.Reader by
// separating individual documents. It first converts the YAML
// body to JSON, then unmarshals the JSON.
type YAMLToJSONDecoder struct {
reader Reader
}
// NewYAMLToJSONDecoder decodes YAML documents from the provided
// stream in chunks by converting each document (as defined by
// the YAML spec) into its own chunk, converting it to JSON via
// yaml.YAMLToJSON, and then passing it to json.Decoder.
func NewYAMLToJSONDecoder(r io.Reader) *YAMLToJSONDecoder {
reader := bufio.NewReader(r)
return &YAMLToJSONDecoder{
//***YAMLToJSONDecoder的reader本质为YAMLReader***//
reader: NewYAMLReader(reader),
}
}
// Decode reads a YAML document as JSON from the stream or returns
// an error. The decoding rules match json.Unmarshal, not
// yaml.Unmarshal.
//***把yaml转换成json,然后转换成结构体***//
func (d *YAMLToJSONDecoder) Decode(into interface{}) error {
bytes, err := d.reader.Read()
if err != nil && err != io.EOF {
return err
}
if len(bytes) != 0 {
data, err := yaml.YAMLToJSON(bytes)
if err != nil {
return err
}
return json.Unmarshal(data, into)
}
return err
}

可以看出,YAMLToJSONDecoder的Decode()方法是通过reader.Read()来读取一个YAML体,然后使用yaml.YAMLToJSON()把YAML体转换成JSON格式,最后使用json.Unmarshal()来把JSON转换成结构体值。
所以现在的关键是reader.Read()是如何读取YAML文件的,又是如何处理分割符的。从NewYAMLToJSONDecoder()可以看出,YAMLToJSONDecoder的reader本质为YAMLReader。

YAMLReader

YAMLReader可以从数据流中读取一个YAML体,不同的YAML体是通过分割符”—“来区分的。YAMLReader定义在/pkg/util/yaml/decoder.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
type YAMLReader struct {
reader Reader
}
func NewYAMLReader(r *bufio.Reader) *YAMLReader {
return &YAMLReader{
reader: &LineReader{reader: r},
}
}
// Read returns a full YAML document.
func (r *YAMLReader) Read() ([]byte, error) {
var buffer bytes.Buffer
for {
line, err := r.reader.Read()
if err != nil && err != io.EOF {
return nil, err
}
//***separator为yaml分割符***//
//***如果遇到分割符的处理***//
sep := len([]byte(separator))
if i := bytes.Index(line, []byte(separator)); i == 0 {
// We have a potential document terminator
i += sep
after := line[i:]
if len(strings.TrimRightFunc(string(after), unicode.IsSpace)) == 0 {
if buffer.Len() != 0 {
//***返回分割符之前的数据***//
return buffer.Bytes(), nil
}
if err == io.EOF {
return nil, err
}
}
}
if err == io.EOF {
if buffer.Len() != 0 {
// If we're at EOF, we have a final, non-terminated line. Return it.
return buffer.Bytes(), nil
}
return nil, err
}
//***把line写入到buffer中***//
buffer.Write(line)
}
}

YAMLReader的Read()方法会对读取数据流中的数据,并写入到buffer中,如果遇到分割,则返回buffer中的内容;如果遇到结束符,也直接返回buffer内容。
由于数据流只有一个,所以YAMLReader的Reader()每次只读一个YAML体,第二次运行时将读第二个YAML体,依此类推。
YAMLReader是通过LineReader来读取行的。

LineReader

LineReader用于读取数据流中的一行,定义在/pkg/util/yaml/decoder.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type LineReader struct {
reader *bufio.Reader
}
// Read returns a single line (with '\n' ended) from the underlying reader.
// An error is returned iff there is an error with the underlying reader.
func (r *LineReader) Read() ([]byte, error) {
var (
isPrefix bool = true
err error = nil
line []byte
buffer bytes.Buffer
)
for isPrefix && err == nil {
line, isPrefix, err = r.reader.ReadLine()
buffer.Write(line)
}
buffer.WriteByte('\n')
return buffer.Bytes(), err
}

可以看出,LineReader的Read()通过调用bufio.Reader的ReadLine()方法读取一行,并在结尾加上’\n’。

分析结束。