之前在”RESTClient,DynamicClient和ClientSet Demo”分享过DynamicClient的用法。本次分析就介绍dynamicClient是如何实现的。

dynamicClient定义

在使用RESTClient时,需要用户自己设置很多的参数,如下所示:

1
2
3
4
5
pod := v1.Pod{}
err = restClient.Get().Resource("pods").Namespace("default").Name("nginx-1487191267-b4w5j").Do().Into(&pod)
if err != nil {
fmt.Println("error")
}

dynamicClient是对RESTClient的封装,使用起来比较方便:

1
2
3
4
obj, err = dynamicClient.Resource(resource, "default").Get("nginx-1487191267-b4w5j")
if err != nil {
fmt.Println("dynamicClient Resource error")
}

先来看下dynamicClient的定义,dynamicClient定义在/pkg/client/typed/dynamic/client.go中:

1
2
3
4
5
6
7
8
// ResourceClient is an API interface to a specific resource under a
// dynamic client.
type ResourceClient struct {
cl *restclient.RESTClient
resource *unversioned.APIResource
ns string
parameterCodec runtime.ParameterCodec
}

dynamicClient可以通过NewClient()生成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// NewClient returns a new client based on the passed in config. The
// codec is ignored, as the dynamic client uses it's own codec.
func NewClient(conf *restclient.Config) (*Client, error) {
// avoid changing the original config
confCopy := *conf
conf = &confCopy
contentConfig := ContentConfig()
contentConfig.GroupVersion = conf.GroupVersion
if conf.NegotiatedSerializer != nil {
contentConfig.NegotiatedSerializer = conf.NegotiatedSerializer
}
conf.ContentConfig = contentConfig
if conf.APIPath == "" {
conf.APIPath = "/api"
}
if len(conf.UserAgent) == 0 {
conf.UserAgent = restclient.DefaultKubernetesUserAgent()
}
//***生成RESTClient***//
cl, err := restclient.RESTClientFor(conf)
if err != nil {
return nil, err
}
return &Client{cl: cl}, nil
}

可以看出,NewClient()先生成RESTClient,然后把RESTClient封装到Client中,并返回Client。

Client定义如下:

1
2
3
4
5
//***dynamicClient本质是一个RESTClient***//
type Client struct {
cl *restclient.RESTClient
parameterCodec runtime.ParameterCodec
}

Client有Resource()方法返回一个dynamicClient:

1
2
3
4
5
6
7
8
func (c *Client) Resource(resource *unversioned.APIResource, namespace string) *ResourceClient {
return &ResourceClient{
cl: c.cl,
resource: resource,
ns: namespace,
parameterCodec: c.parameterCodec,
}
}

所以我们可以通过Resource()设置dynamicClient的resource和ns字段。现在,我们得到了一个dynamicClient,该dynamicClient可以对某个命名空间下的某种资源进行操作。

dynamicClient方法

dynamicClient定义有对资源的各种操作方法。

List()

List()可以获取资源的列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// List returns a list of objects for this resource.
func (rc *ResourceClient) List(opts runtime.Object) (runtime.Object, error) {
parameterEncoder := rc.parameterCodec
if parameterEncoder == nil {
parameterEncoder = defaultParameterEncoder
}
//***调用RESTClient接口***//
return rc.cl.Get().
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(rc.resource.Name).
VersionedParams(opts, parameterEncoder).
Do().
Get()
}

Get()

Get()可以获取具体资源的详细信息。

1
2
3
4
5
6
7
8
9
10
11
func (rc *ResourceClient) Get(name string) (*runtime.Unstructured, error) {
result := new(runtime.Unstructured)
//***调用RESTClient方法***//
err := rc.cl.Get().
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(rc.resource.Name).
Name(name).
Do().
Into(result)
return result, err
}

Delete()

Delete()可以删除某具体资源。

1
2
3
4
5
6
7
8
9
10
11
// Delete deletes the resource with the specified name.
func (rc *ResourceClient) Delete(name string, opts *v1.DeleteOptions) error {
//***调用RESTClient方法***//
return rc.cl.Delete().
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(rc.resource.Name).
Name(name).
Body(opts).
Do().
Error()
}

DeleteCollection()

DeleteCollection()可以删除多个资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// DeleteCollection deletes a collection of objects.
func (rc *ResourceClient) DeleteCollection(deleteOptions *v1.DeleteOptions, listOptions runtime.Object) error {
parameterEncoder := rc.parameterCodec
if parameterEncoder == nil {
parameterEncoder = defaultParameterEncoder
}
return rc.cl.Delete().
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(rc.resource.Name).
VersionedParams(listOptions, parameterEncoder).
Body(deleteOptions).
Do().
Error()
}

Create()

1
2
3
4
5
6
7
8
9
10
11
12
// Create creates the provided resource.
func (rc *ResourceClient) Create(obj *runtime.Unstructured) (*runtime.Unstructured, error) {
result := new(runtime.Unstructured)
//***调用RESTClient方法***//
err := rc.cl.Post().
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(rc.resource.Name).
Body(obj).
Do().
Into(result)
return result, err
}

Update()

Update()可以更新某资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Update updates the provided resource.
func (rc *ResourceClient) Update(obj *runtime.Unstructured) (*runtime.Unstructured, error) {
result := new(runtime.Unstructured)
if len(obj.GetName()) == 0 {
return result, errors.New("object missing name")
}
err := rc.cl.Put().
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(rc.resource.Name).
Name(obj.GetName()).
Body(obj).
Do().
Into(result)
return result, err
}

Watch()

Watch()可以对某资源进行变化监控,返回一个watcher。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Watch returns a watch.Interface that watches the resource.
func (rc *ResourceClient) Watch(opts runtime.Object) (watch.Interface, error) {
parameterEncoder := rc.parameterCodec
if parameterEncoder == nil {
parameterEncoder = defaultParameterEncoder
}
//***需要设置RECTClient的pathPrefix为"watch"***//
return rc.cl.Get().
Prefix("watch").
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(rc.resource.Name).
VersionedParams(opts, parameterEncoder).
Watch()
}

Patch()

Patch()可以对某资源进行局部更新。

1
2
3
4
5
6
7
8
9
10
11
func (rc *ResourceClient) Patch(name string, pt api.PatchType, data []byte) (*runtime.Unstructured, error) {
result := new(runtime.Unstructured)
err := rc.cl.Patch(pt).
NamespaceIfScoped(rc.ns, rc.resource.Namespaced).
Resource(rc.resource.Name).
Name(name).
Body(data).
Do().
Into(result)
return result, err
}

举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package main
import (
"flag"
"fmt"
"reflect"
"encoding/json"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/api/unversioned"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/rest"
)
func main() {
kubeconfig := flag.String("kubeconfig", "/root/.kube/config", "Path to a kube config. Only required if out-of-cluster.")
fmt.Println(kubeconfig)
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
fmt.Println("error1")
}
// 生成dynamicClient
gv := &unversioned.GroupVersion{"", "v1"}
resource := &unversioned.APIResource{Name: "pods", Namespaced: true}
config.ContentConfig = rest.ContentConfig{GroupVersion: gv}
config.APIPath = "/api"
dynamicClient, err := dynamic.NewClient(config)
if err != nil {
fmt.Println("dynamic NewCLient error")
}
// 获取所有namespace的pod列表
obj, err := dynamicClient.Resource(resource, "").List(&v1.ListOptions{})
if err != nil {
fmt.Println("dynamicClient Resource error")
}
js, err := json.Marshal(reflect.ValueOf(obj).Elem().Interface())
if err != nil {
fmt.Println("error")
}
podlist := v1.PodList{}
json.Unmarshal(js, &podlist)
fmt.Println(podlist)
fmt.Println("------------------------")
// 获取具体具体pod
obj, err = dynamicClient.Resource(resource, "default").Get("nginx-1487191267-b4w5j")
if err != nil {
fmt.Println("dynamicClient Resource error")
}
js, err = json.Marshal(obj)
if err != nil {
fmt.Println("error")
}
pod := v1.Pod{}
json.Unmarshal(js, &pod)
fmt.Println(pod)
}