kubernetes-client分析(三)-dynamicClient-v1.5.2

之前在”RESTClient,DynamicClient和ClientSet Demo”分享过DynamicClient的用法。本次分析就介绍dynamicClient是如何实现的。

dynamicClient定义

在使用RESTClient时,需要用户自己设置很多的参数,如下所示:

1
2
3
4
5
pod := v1.Pod{}
err = restClient.Get().Resource("pods").Namespace("default").Name("nginx-1487191267-b4w5j").Do().Into(&pod)
if err != nil {
fmt.Println("error")
}

阅读全文

kubernetes-client分析(二)-restclient-v1.5.2

Config

Config中含有apiserver相关的信息,可以生成RESTClient,定义在/pkg/client/restclient/config.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
//***Config包含apiserver相关的信息***//
//***config:&{https://kubernetes:6443 /api {application/json application/json v1 0xc420109240} <nil> <nil> {/etc/kubernetes/security/serverkey/server.crt /etc/kubernetes/security/serverkey/server.key /etc/kubernetes/security/serverkey/ca.crt [] [] []} false kubectl/v1.5.2+08e0995 (linux/amd64) kubernetes/08e0995 <nil> <nil> 0 0 <nil> 0s}***//
type Config struct {
// Host must be a host string, a host:port pair, or a URL to the base of the apiserver.
// If a URL is given then the (optional) Path of that URL represents a prefix that must
// be appended to all request URIs used to access the apiserver. This allows a frontend
// proxy to easily relocate all of the apiserver endpoints.
Host string
// APIPath is a sub-path that points to an API root.
APIPath string
// Prefix is the sub path of the server. If not specified, the client will set
// a default value. Use "/" to indicate the server root should be used
Prefix string
// ContentConfig contains settings that affect how objects are transformed when
// sent to the server.
ContentConfig
// Server requires Basic authentication
Username string
Password string
// Server requires Bearer authentication. This client will not attempt to use
// refresh tokens for an OAuth2 flow.
// TODO: demonstrate an OAuth2 compatible client.
BearerToken string
// Impersonate is the username that this RESTClient will impersonate
Impersonate string
// Server requires plugin-specified authentication.
AuthProvider *clientcmdapi.AuthProviderConfig
// Callback to persist config for AuthProvider.
AuthConfigPersister AuthProviderConfigPersister
// TLSClientConfig contains settings to enable transport layer security
TLSClientConfig
// Server should be accessed without verifying the TLS
// certificate. For testing only.
Insecure bool
// UserAgent is an optional field that specifies the caller of this request.
UserAgent string
// Transport may be used for custom HTTP behavior. This attribute may not
// be specified with the TLS client certificate options. Use WrapTransport
// for most client level operations.
Transport http.RoundTripper
// WrapTransport will be invoked for custom HTTP behavior after the underlying
// transport is initialized (either the transport created from TLSClientConfig,
// Transport, or http.DefaultTransport). The config may layer other RoundTrippers
// on top of the returned RoundTripper.
WrapTransport func(rt http.RoundTripper) http.RoundTripper
// QPS indicates the maximum QPS to the master from this client.
// If it's zero, the created RESTClient will use DefaultQPS: 5
QPS float32
// Maximum burst for throttle.
// If it's zero, the created RESTClient will use DefaultBurst: 10.
Burst int
// Rate limiter for limiting connections to the master from this client. If present overwrites QPS/Burst
RateLimiter flowcontrol.RateLimiter
// The maximum length of time to wait before giving up on a server request. A value of zero means no timeout.
Timeout time.Duration
// Version forces a specific version to be used (if registered)
// Do we need this?
// Version string
}

阅读全文

kubernetes-client分析(一)-kubeconfig-v1.5.2

接下来,开始分析kubernetes的client相关代码。之前有篇”RESTClient,DynamicClient和ClientSet Demo”( https://fankangbest.github.io/2017/07/15/RESTClient-DynamicClient%E5%92%8CClientSet-Demo/ )的分析介绍三种client的用法,三种client都使用下面代码生成config:

1
2
3
4
5
6
kubeconfig := flag.String("kubeconfig", "/root/.kube/config", "Path to a kube config. Only required if out-of-cluster.")
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
fmt.Println("BuildConfigFromFlags error")
}

阅读全文

kubernetes-client-python使用-v–1.0.1

client-python

kubernetes-incubator/client-python是针对Kubernetes使用python开发的client,可以很方便地让用户通过python访问Kubernetes集群。这次分析将选择典型场景介绍client-python是如何使用的。

阅读全文

GO语言包使用-buffio

(整理自https://studygolang.com/articles/4367)

Read类

func NewReaderSize(rd io.Reader, size int) *Reader
NewReadSize()将rd封装成一个拥有size大小缓存的bufio.Reader对象。
func NewReader(rd io.Reader) *Reader
NewReader()相当于NewReaderSize(rd, 4096)。

阅读全文

yaml格式转换分析-YAMLOrJSONDecoder

之前在kubectl printer中已经分析过kubectl是如何以YAML格式打印内容的,本次分析将介绍kubectl是如何从YAML格式文件输入内容并做转换的。

FileVisitor

我们在使用kubectl create -f时,可以从文件创建资源,文件的格式可以为JSON格式或YAML格式。其中YAML格式支持”—“分割符,可以把多个YAML写在一个文件中。读取文件的操作通过FileVisitor进行,FileVisitor定义在/pkg/kubectl/resource/visitor.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//***读取文件的visitor***//
//***是对StreamVisitor的封装***//
type FileVisitor struct {
Path string
*StreamVisitor
}
// Visit in a FileVisitor is just taking care of opening/closing files
func (v *FileVisitor) Visit(fn VisitorFunc) error {
var f *os.File
if v.Path == constSTDINstr {
f = os.Stdin
} else {
var err error
if f, err = os.Open(v.Path); err != nil {
return err
}
}
defer f.Close()
v.StreamVisitor.Reader = f
return v.StreamVisitor.Visit(fn)
}

阅读全文

kube-scheduler分析(二)-scheduler-v1.5.2

本次将分析kube-scheduler的核心——scheduler,来看下是如何生成scheduler,及如何对Pod进行调度的。

ConfigFactory

ConfigFactory可以生成Scheduler Config。先来看如何生成一个ConfigFactory,生成函数字义在/plugin/pkg/scheduler/factory/factory.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
//***生成ConfigFactory***//
func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodAffinitySymmetricWeight int, failureDomains string) *ConfigFactory {
stopEverything := make(chan struct{})
schedulerCache := schedulercache.New(30*time.Second, stopEverything)
// TODO: pass this in as an argument...
informerFactory := informers.NewSharedInformerFactory(client, 0)
pvcInformer := informerFactory.PersistentVolumeClaims()
c := &ConfigFactory{
Client: client,
PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc),
ScheduledPodLister: &cache.StoreToPodLister{},
informerFactory: informerFactory,
// Only nodes in the "Ready" condition with status == "True" are schedulable
NodeLister: &cache.StoreToNodeLister{},
PVLister: &cache.StoreToPVFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
PVCLister: pvcInformer.Lister(),
pvcPopulator: pvcInformer.Informer().GetController(),
ServiceLister: &cache.StoreToServiceLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
ControllerLister: &cache.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
ReplicaSetLister: &cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
schedulerCache: schedulerCache,
StopEverything: stopEverything,
SchedulerName: schedulerName,
HardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight,
FailureDomains: failureDomains,
}
c.PodLister = schedulerCache
// On add/delete to the scheduled pods, remove from the assumed pods.
// We construct this here instead of in CreateFromKeys because
// ScheduledPodLister is something we provide to plug in functions that
// they may need to call.
//***生成scheduledPod Indexer Contoller***//
c.ScheduledPodLister.Indexer, c.scheduledPodPopulator = cache.NewIndexerInformer(
c.createAssignedNonTerminatedPodLW(),
&api.Pod{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: c.addPodToCache,
UpdateFunc: c.updatePodInCache,
DeleteFunc: c.deletePodFromCache,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
//***生成Node Controller***//
c.NodeLister.Store, c.nodePopulator = cache.NewInformer(
c.createNodeLW(),
&api.Node{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: c.addNodeToCache,
UpdateFunc: c.updateNodeInCache,
DeleteFunc: c.deleteNodeFromCache,
},
)
// TODO(harryz) need to fill all the handlers here and below for equivalence cache
//***生成PV Controller***//
c.PVLister.Store, c.pvPopulator = cache.NewInformer(
c.createPersistentVolumeLW(),
&api.PersistentVolume{},
0,
cache.ResourceEventHandlerFuncs{},
)
//***生成Service Controller***//
c.ServiceLister.Indexer, c.servicePopulator = cache.NewIndexerInformer(
c.createServiceLW(),
&api.Service{},
0,
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
//***生成RC Controller***//
c.ControllerLister.Indexer, c.controllerPopulator = cache.NewIndexerInformer(
c.createControllerLW(),
&api.ReplicationController{},
0,
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
return c
}

阅读全文

kube-scheduler分析(一)-init过程-v1.5.2

kube-scheduler在Kubernetes中负责Pods的调度,其主要流程是获取未被调度的pod,然后根据pod的信息过滤出符合要求的nodes,接着对这些符合要求的nodes进行打分,最后把得分最高的node作为pod的调度结果。所以,在kube-scheduler中有两类算法,一种是用来过滤nodes的算法,称为predicate类;另一种是来用打分的算法,称为priority类。本次分析,就是介绍kube-scheduler是如何对算法进行管理的。

阅读全文

kube-proxy分析(二)-proxier-v1.5.2

本次分析只介绍userspace模式。

proxySocket

Proxysocket相关的内容定义在/pkg/proxy/userspace/proxysocket.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
// Abstraction over TCP/UDP sockets which are proxied.
type proxySocket interface {
// Addr gets the net.Addr for a proxySocket.
Addr() net.Addr
// Close stops the proxySocket from accepting incoming connections.
// Each implementation should comment on the impact of calling Close
// while sessions are active.
Close() error
// ProxyLoop proxies incoming connections for the specified service to the service endpoints.
ProxyLoop(service proxy.ServicePortName, info *serviceInfo, proxier *Proxier)
// ListenPort returns the host port that the proxySocket is listening on
ListenPort() int
}

阅读全文

kube-proxy分析(一)-config-v1.5.2

Kube-proxy在Kubernetes中负责把service的流量导入到具体的pod上。所以kube-proxy需要从apiserver获取service及endpoint信息,而这些信息的获取就是通过config来管理的。该处的config和配置不同,应该理解为对信息来源的管理。Kube-proxy只支持从apiserver获取信息;而kubelet可以从apiserver,file和http三种渠道获取信息。本次分析将介绍kube-proxy的config,即kube-proxy是如何从apiserver中获取service及endpoint的变化,是如何把这种变化交给处理函数的。
在kube-proxy中有ServiceConfig和EndpointConfig,我们以ServiceConfig为例子。
ServiceConfig是kube-proxy从各渠道收集services信息,然后经mux merge()后,把各渠道的信息汇总到ServiceStore对应的source key下。最后通过BroadCaster先合并所有source下的services,并交给注册好的handler处理。
先来看两个概念:Mux和Broadcaster。

阅读全文