接下来,开始分析kubernetes的client相关代码。之前有篇”RESTClient,DynamicClient和ClientSet Demo”( https://fankangbest.github.io/2017/07/15/RESTClient-DynamicClient%E5%92%8CClientSet-Demo/ )的分析介绍三种client的用法,三种client都使用下面代码生成config:

1
2
3
4
5
6
kubeconfig := flag.String("kubeconfig", "/root/.kube/config", "Path to a kube config. Only required if out-of-cluster.")
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
fmt.Println("BuildConfigFromFlags error")
}

这段代码就是读取/root/.kube/config文件,然后生成config。我们之前分析过kubernetes的kubeconfig机制( https://fankangbest.github.io/2017/07/09/Config%E6%9C%BA%E5%88%B6%E4%BD%BF%E7%94%A8-v1-5-2/ ),里面有介绍如何使用kubeconfig机制。

kubeconfig

kubeconfig中定义了与apiserver相关的基本信息,可以通过读取kubeconfig生成相应的config,然后进一步生成client。kubeconfig结构如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
apiVersion: v1
kind: Config
users:
- name: node
user:
client-certificate: /etc/kubernetes/security/serverkey/server.crt
client-key: /etc/kubernetes/security/serverkey/server.key
clusters:
- name: local
cluster:
certificate-authority: /etc/kubernetes/security/serverkey/ca.crt
server: https://kubernetes:6443
contexts:
- context:
cluster: local
user: node
name: my-context
current-context: my-context

那么,kubernetes是如何把kubeconfig转换成client的config的呢?本次将详细分析。

BuildConfigFromFlags()

config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)可以看出,通过BuildCOnfigFromFlags()可以将kubeconfig转换成client的config。来看下BuildCOnfigFromFlags(),定义在/pkg/client/unversioned/clientcmd/client_config.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func BuildConfigFromFlags(masterUrl, kubeconfigPath string) (*restclient.Config, error) {
if kubeconfigPath == "" && masterUrl == "" {
glog.Warningf("Neither --kubeconfig nor --master was specified. Using the inClusterConfig. This might not work.")
kubeconfig, err := restclient.InClusterConfig()
if err == nil {
return kubeconfig, nil
}
glog.Warning("error creating inClusterConfig, falling back to default config: ", err)
}
//***调用DeferredLoadingClientConfig的ClientConfig()生成restclient.Config***//
//***DeferredLoadingClientConfig定义在/pkg/client/unversioned/merged_client_builder.go中***//
return NewNonInteractiveDeferredLoadingClientConfig(
&ClientConfigLoadingRules{ExplicitPath: kubeconfigPath},
&ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: masterUrl}}).ClientConfig()
}

可以看出,BuildConfigFromFlags()先调用NewNonInteractiveDeferredLoadingClientConfig()生成DirectClientConfig,然后调用DirectClientCOnfig的ClientConfig()方法生成client的config。NewNonInteractiveDeferredLoadingClientConfig()的参数为ClientConfigLoadingRules和ConfigOverrides。
下面将对这些概念进行分析。

ClientConfigLoadingRules

ClientConfigLoadingRules实现了ClientCOnfigLoader,定义在/pkg/client/unversioned/clientcmd/loader.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type ClientConfigLoader interface {
ConfigAccess
// IsDefaultConfig returns true if the returned config matches the defaults.
IsDefaultConfig(*restclient.Config) bool
// Load returns the latest config
Load() (*clientcmdapi.Config, error)
}
type ClientConfigLoadingRules struct {
ExplicitPath string
Precedence []string
// MigrationRules is a map of destination files to source files. If a destination file is not present, then the source file is checked.
// If the source file is present, then it is copied to the destination file BEFORE any further loading happens.
MigrationRules map[string]string
// DoNotResolvePaths indicates whether or not to resolve paths with respect to the originating files. This is phrased as a negative so
// that a default object that doesn't set this will usually get the behavior it wants.
DoNotResolvePaths bool
// DefaultClientConfig is an optional field indicating what rules to use to calculate a default configuration.
// This should match the overrides passed in to ClientConfig loader.
DefaultClientConfig ClientConfig
}

其中:

  • ExplicitPath: 表示kubeconfig的路径,优先级低于Precedence;
  • Precedence: 表示从KUBECONFIG等环境变量中获取到的路径;
  • MigrationRules: 表示旧路径到新路径的映射关系;
  • DoNotResolvePaths: 表示是否需要把相对路径转换成绝对路径;
  • DefaultClientConfig: 表示默认的配置。

接下来看下ClientConfigLoadingRules的主要方法。

Load()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
//***从kubeconfig中提取信息,生成config***//
func (rules *ClientConfigLoadingRules) Load() (*clientcmdapi.Config, error) {
if err := rules.Migrate(); err != nil {
return nil, err
}
errlist := []error{}
kubeConfigFiles := []string{}
// Make sure a file we were explicitly told to use exists
if len(rules.ExplicitPath) > 0 {
if _, err := os.Stat(rules.ExplicitPath); os.IsNotExist(err) {
return nil, err
}
kubeConfigFiles = append(kubeConfigFiles, rules.ExplicitPath)
} else {
kubeConfigFiles = append(kubeConfigFiles, rules.Precedence...)
}
kubeconfigs := []*clientcmdapi.Config{}
// read and cache the config files so that we only look at them once
//***filename: /root/.kube/config***//
for _, filename := range kubeConfigFiles {
if len(filename) == 0 {
// no work to do
continue
}
config, err := LoadFromFile(filename)
if os.IsNotExist(err) {
// skip missing files
continue
}
if err != nil {
errlist = append(errlist, fmt.Errorf("Error loading config file \"%s\": %v", filename, err))
continue
}
kubeconfigs = append(kubeconfigs, config)
}
// first merge all of our maps
mapConfig := clientcmdapi.NewConfig()
for _, kubeconfig := range kubeconfigs {
//***此处调用的是第三方的包imdario/mergo***//
//***把kubeconfig中的内容merge到mapConfig中***//
mergo.Merge(mapConfig, kubeconfig)
}
// merge all of the struct values in the reverse order so that priority is given correctly
// errors are not added to the list the second time
nonMapConfig := clientcmdapi.NewConfig()
for i := len(kubeconfigs) - 1; i >= 0; i-- {
kubeconfig := kubeconfigs[i]
mergo.Merge(nonMapConfig, kubeconfig)
}
// since values are overwritten, but maps values are not, we can merge the non-map config on top of the map config and
// get the values we expect.
config := clientcmdapi.NewConfig()
mergo.Merge(config, mapConfig)
mergo.Merge(config, nonMapConfig)
if rules.ResolvePaths() {
if err := ResolveLocalPaths(config); err != nil {
errlist = append(errlist, err)
}
}
return config, utilerrors.NewAggregate(errlist)
}

Load()流程如下:

  1. 调用Migrate()把旧kubeconfig文件转换成新kubeconfig文件,这样可以保持兼容性;
  2. 把ExplicitPath或Precedence合并成kubeConfigFiles;
  3. 调用LoadFromFile()读取kubeconfigFiles中的kubeconfig文件,生成kubeconfigs;
  4. 调用第三方包imdario/mergo的Merge(),把kubeconfigs合并成mapConfig;
  5. 反序把kubeconfigs合并成nonMapConfig;
  6. 把mapConfig和nonMapConfig合并成config;
  7. 调用ResolveLocalPaths()解决config中相对路径的问题。

其中步骤4, 5, 6过程感觉有些疑惑,还不知道为什么要这么做,待研究完imdario/mergo再回过头来看。

Migrate()

把旧的/root/.kube/.kubeconfig内容转移至/root/.kube/config。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func (rules *ClientConfigLoadingRules) Migrate() error {
if rules.MigrationRules == nil {
return nil
}
for destination, source := range rules.MigrationRules {
if _, err := os.Stat(destination); err == nil {
// if the destination already exists, do nothing
continue
} else if os.IsPermission(err) {
// if we can't access the file, skip it
continue
} else if !os.IsNotExist(err) {
// if we had an error other than non-existence, fail
return err
}
if sourceInfo, err := os.Stat(source); err != nil {
if os.IsNotExist(err) || os.IsPermission(err) {
// if the source file doesn't exist or we can't access it, there's no work to do.
continue
}
// if we had an error other than non-existence, fail
return err
} else if sourceInfo.IsDir() {
return fmt.Errorf("cannot migrate %v to %v because it is a directory", source, destination)
}
in, err := os.Open(source)
if err != nil {
return err
}
defer in.Close()
out, err := os.Create(destination)
if err != nil {
return err
}
defer out.Close()
if _, err = io.Copy(out, in); err != nil {
return err
}
}
return nil
}

LoadFromFile()

读取kubeconfig,然后生成config。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func LoadFromFile(filename string) (*clientcmdapi.Config, error) {
kubeconfigBytes, err := ioutil.ReadFile(filename)
if err != nil {
return nil, err
}
config, err := Load(kubeconfigBytes)
if err != nil {
return nil, err
}
glog.V(6).Infoln("Config loaded from file", filename)
// set LocationOfOrigin on every Cluster, User, and Context
for key, obj := range config.AuthInfos {
obj.LocationOfOrigin = filename
config.AuthInfos[key] = obj
}
for key, obj := range config.Clusters {
obj.LocationOfOrigin = filename
config.Clusters[key] = obj
}
for key, obj := range config.Contexts {
obj.LocationOfOrigin = filename
config.Contexts[key] = obj
}
if config.AuthInfos == nil {
config.AuthInfos = map[string]*clientcmdapi.AuthInfo{}
}
if config.Clusters == nil {
config.Clusters = map[string]*clientcmdapi.Cluster{}
}
if config.Contexts == nil {
config.Contexts = map[string]*clientcmdapi.Context{}
}
return config, nil
}

Load()

把data解码到Config结构体中。

1
2
3
4
5
6
7
8
9
10
11
12
13
func Load(data []byte) (*clientcmdapi.Config, error) {
//***生成config***//
config := clientcmdapi.NewConfig()
// if there's no data in a file, return the default object instead of failing (DecodeInto reject empty input)
if len(data) == 0 {
return config, nil
}
decoded, _, err := clientcmdlatest.Codec.Decode(data, &unversioned.GroupVersionKind{Version: clientcmdlatest.Version, Kind: "Config"}, config)
if err != nil {
return nil, err
}
return decoded.(*clientcmdapi.Config), nil
}

ConfigOverrides

ConfigOverrides定义在/pkg/client/unversioned/clientcmd/overrides.go中:

1
2
3
4
5
6
7
8
9
10
11
// ConfigOverrides holds values that should override whatever information is pulled from the actual Config object. You can't
// simply use an actual Config object, because Configs hold maps, but overrides are restricted to "at most one"
type ConfigOverrides struct {
AuthInfo clientcmdapi.AuthInfo
// ClusterDefaults are applied before the configured cluster info is loaded.
ClusterDefaults clientcmdapi.Cluster
ClusterInfo clientcmdapi.Cluster
Context clientcmdapi.Context
CurrentContext string
Timeout string
}

ConfigOverrides很简单,就是携带了config的某些信息,这些信息的优先级最高。

DeferredLoadingClientConfig

使用NewNonInteractiveDeferredLoadingClientConfig()可以生成一个DeferredLoadingClientConfig,DeferredLoadingClientConfig定义在/pkg/client/unversioned/clientcmd/merged_client_builder.go中:

1
2
3
4
5
6
7
8
9
10
11
type DeferredLoadingClientConfig struct {
loader ClientConfigLoader
overrides *ConfigOverrides
fallbackReader io.Reader
clientConfig ClientConfig
loadingLock sync.Mutex
// provided for testing
icc InClusterConfig
}

DeferredLoadingClientConfig可以把kubeconfig转换成DirectClientConfig,继而DirectClientConfig可以转换成client的config。之所以称为DeferredLoadingClientConfig,是因为DeferredLoadingClientConfig通过DirectClientConfig去生成client的config。

ClientConfig()

ClientConfig()先调用createClientConfig()方法生成DirectClientConfig,然后再调用DirectClientConfig的ClientConfig()方法生成client的Config,并返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// ClientConfig implements ClientConfig
func (config *DeferredLoadingClientConfig) ClientConfig() (*restclient.Config, error) {
//***调用createClientConfig(),里面有Load()***//
mergedClientConfig, err := config.createClientConfig()
if err != nil {
return nil, err
}
// load the configuration and return on non-empty errors and if the
// content differs from the default config
mergedConfig, err := mergedClientConfig.ClientConfig()
switch {
case err != nil:
if !IsEmptyConfig(err) {
// return on any error except empty config
return nil, err
}
case mergedConfig != nil:
// the configuration is valid, but if this is equal to the defaults we should try
// in-cluster configuration
if !config.loader.IsDefaultConfig(mergedConfig) {
return mergedConfig, nil
}
}
// check for in-cluster configuration and use it
if config.icc.Possible() {
glog.V(4).Infof("Using in-cluster configuration")
return config.icc.ClientConfig()
}
// return the result of the merged client config
return mergedConfig, err
}

createClientConfig()

createClientConfig()先通过loader.Load()把kubeconfig合并成mergedConfig(类型为Config,定义在/pkg/client/unversioned/clientcmd/api/types.go)。然后使用NewNonInteractiveClientConfig()生成DirectClientConfig,并把DirectClientConfig赋值给DeferredLoadingClientConfig的clientConfig,返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//***生成clientConfig并返回***//
func (config *DeferredLoadingClientConfig) createClientConfig() (ClientConfig, error) {
if config.clientConfig == nil {
config.loadingLock.Lock()
defer config.loadingLock.Unlock()
if config.clientConfig == nil {
//***调用loader.Load()***//
//***把多个kubeconfig合并成一个***//
mergedConfig, err := config.loader.Load()
if err != nil {
return nil, err
}
var mergedClientConfig ClientConfig
if config.fallbackReader != nil {
mergedClientConfig = NewInteractiveClientConfig(*mergedConfig, config.overrides.CurrentContext, config.overrides, config.fallbackReader, config.loader)
} else {
//***此处的mergedClientConfig类型为client_config.go中的DirectClientConfig***//
mergedClientConfig = NewNonInteractiveClientConfig(*mergedConfig, config.overrides.CurrentContext, config.overrides, config.loader)
}
config.clientConfig = mergedClientConfig
}
}
return config.clientConfig, nil
}

NewNonInteractiveClientConfig()

NewNonInteractiveClientConfig()或NewInteractiveClientConfig()可以生成DirectClientConfig,两者的区别是是否传入fallbackReader。fallbackReader是认证信息不足调用的(还未做深入研究)。两个函数定义在/pkg/client/unversioned/clientcmd/client_config.go中:

1
2
3
4
5
6
7
8
9
// NewDefaultClientConfig creates a DirectClientConfig using the config.CurrentContext as the context name
func NewDefaultClientConfig(config clientcmdapi.Config, overrides *ConfigOverrides) ClientConfig {
return &DirectClientConfig{config, config.CurrentContext, overrides, nil, NewDefaultClientConfigLoadingRules()}
}
// NewNonInteractiveClientConfig creates a DirectClientConfig using the passed context name and does not have a fallback reader for auth information
func NewNonInteractiveClientConfig(config clientcmdapi.Config, contextName string, overrides *ConfigOverrides, configAccess ConfigAccess) ClientConfig {
return &DirectClientConfig{config, contextName, overrides, nil, configAccess}
}

DirectClientConfig

DirectClientConfig可以直接生成client的config,定义在/pkg/client/unversioned/clientcmd/client_config.go中:

1
2
3
4
5
6
7
8
// DirectClientConfig is a ClientConfig interface that is backed by a clientcmdapi.Config, options overrides, and an optional fallbackReader for auth information
type DirectClientConfig struct {
config clientcmdapi.Config
contextName string
overrides *ConfigOverrides
fallbackReader io.Reader
configAccess ConfigAccess
}

ClientConfig()

ClientConfig()方法使用clientConfig := &restclient.Config{},并把DirectClientConfig上的内容填充到clientConfig中,clientConfig就是之前称呼的client的config。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// ClientConfig implements ClientConfig
func (config *DirectClientConfig) ClientConfig() (*restclient.Config, error) {
// check that getAuthInfo, getContext, and getCluster do not return an error.
// Do this before checking if the curent config is usable in the event that an
// AuthInfo, Context, or Cluster config with user-defined names are not found.
// This provides a user with the immediate cause for error if one is found
configAuthInfo, err := config.getAuthInfo()
if err != nil {
return nil, err
}
_, err = config.getContext()
if err != nil {
return nil, err
}
configClusterInfo, err := config.getCluster()
if err != nil {
return nil, err
}
if err := config.ConfirmUsable(); err != nil {
return nil, err
}
clientConfig := &restclient.Config{}
clientConfig.Host = configClusterInfo.Server
if len(config.overrides.Timeout) > 0 {
if i, err := strconv.ParseInt(config.overrides.Timeout, 10, 64); err == nil && i >= 0 {
clientConfig.Timeout = time.Duration(i) * time.Second
} else if requestTimeout, err := time.ParseDuration(config.overrides.Timeout); err == nil {
clientConfig.Timeout = requestTimeout
} else {
return nil, fmt.Errorf("Invalid value for option '--request-timeout'. Value must be a single integer, or an integer followed by a corresponding time unit (e.g. 1s | 2m | 3h)")
}
}
if u, err := url.ParseRequestURI(clientConfig.Host); err == nil && u.Opaque == "" && len(u.Path) > 1 {
u.RawQuery = ""
u.Fragment = ""
clientConfig.Host = u.String()
}
if len(configAuthInfo.Impersonate) > 0 {
clientConfig.Impersonate = configAuthInfo.Impersonate
}
// only try to read the auth information if we are secure
if restclient.IsConfigTransportTLS(*clientConfig) {
var err error
// mergo is a first write wins for map value and a last writing wins for interface values
// NOTE: This behavior changed with https://github.com/imdario/mergo/commit/d304790b2ed594794496464fadd89d2bb266600a.
// Our mergo.Merge version is older than this change.
var persister restclient.AuthProviderConfigPersister
if config.configAccess != nil {
authInfoName, _ := config.getAuthInfoName()
persister = PersisterForUser(config.configAccess, authInfoName)
}
userAuthPartialConfig, err := getUserIdentificationPartialConfig(configAuthInfo, config.fallbackReader, persister)
if err != nil {
return nil, err
}
mergo.Merge(clientConfig, userAuthPartialConfig)
serverAuthPartialConfig, err := getServerIdentificationPartialConfig(configAuthInfo, configClusterInfo)
if err != nil {
return nil, err
}
mergo.Merge(clientConfig, serverAuthPartialConfig)
}
return clientConfig, nil
}

getCluster()

在这些getXXX()方法中,会优先使用overrides中的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// getCluster returns the clientcmdapi.Cluster, or an error if a required cluster is not found.
func (config *DirectClientConfig) getCluster() (clientcmdapi.Cluster, error) {
clusterInfos := config.config.Clusters
clusterInfoName, required := config.getClusterName()
var mergedClusterInfo clientcmdapi.Cluster
mergo.Merge(&mergedClusterInfo, config.overrides.ClusterDefaults)
if configClusterInfo, exists := clusterInfos[clusterInfoName]; exists {
mergo.Merge(&mergedClusterInfo, configClusterInfo)
} else if required {
return clientcmdapi.Cluster{}, fmt.Errorf("cluster %q does not exist", clusterInfoName)
}
mergo.Merge(&mergedClusterInfo, config.overrides.ClusterInfo)
// An override of --insecure-skip-tls-verify=true and no accompanying CA/CA data should clear already-set CA/CA data
// otherwise, a kubeconfig containing a CA reference would return an error that "CA and insecure-skip-tls-verify couldn't both be set"
caLen := len(config.overrides.ClusterInfo.CertificateAuthority)
caDataLen := len(config.overrides.ClusterInfo.CertificateAuthorityData)
if config.overrides.ClusterInfo.InsecureSkipTLSVerify && caLen == 0 && caDataLen == 0 {
mergedClusterInfo.CertificateAuthority = ""
mergedClusterInfo.CertificateAuthorityData = nil
}
return mergedClusterInfo, nil
}
// getClusterName returns a string containing the default, or user-set cluster name, and a boolean
// indicating whether the default clusterName has been overwritten by a user-set flag, or left as
// its default value
func (config *DirectClientConfig) getClusterName() (string, bool) {
if len(config.overrides.Context.Cluster) != 0 {
return config.overrides.Context.Cluster, true
}
context, _ := config.getContext()
return context.Cluster, false
}

总结

ClientConfigLoadingRules作为ClientConfigLoader可以把kubeconfig文件(可能多个)转换成clientcmdapi.Config(定义在types.go中);
DeferredLoadingClientConfig封装有ClientConfigLoadingRules,在获取到clientcmdapi.Config之后,会转换成DirectClientConfig,然后生成restclient.Config。