Config
Config中含有apiserver相关的信息,可以生成RESTClient,定义在/pkg/client/restclient/config.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| type Config struct { Host string APIPath string Prefix string ContentConfig Username string Password string BearerToken string Impersonate string AuthProvider *clientcmdapi.AuthProviderConfig AuthConfigPersister AuthProviderConfigPersister TLSClientConfig Insecure bool UserAgent string Transport http.RoundTripper WrapTransport func(rt http.RoundTripper) http.RoundTripper // QPS indicates the maximum QPS to the master from this client. // If it's zero, the created RESTClient will use DefaultQPS: 5 QPS float32 // Maximum burst for throttle. // If it's zero, the created RESTClient will use DefaultBurst: 10. Burst int // Rate limiter for limiting connections to the master from this client. If present overwrites QPS/Burst RateLimiter flowcontrol.RateLimiter // The maximum length of time to wait before giving up on a server request. A value of zero means no timeout. Timeout time.Duration // Version forces a specific version to be used (if registered) // Do we need this? // Version string }
|
可以通过调用RESTClientFor()或UnversionedRESTClientFor()来生成restClient,以RESTClientFor()为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| func RESTClientFor(config *Config) (*RESTClient, error) { if config.GroupVersion == nil { return nil, fmt.Errorf("GroupVersion is required when initializing a RESTClient") } if config.NegotiatedSerializer == nil { return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient") } qps := config.QPS if config.QPS == 0.0 { qps = DefaultQPS } burst := config.Burst if config.Burst == 0 { burst = DefaultBurst } baseURL, versionedAPIPath, err := defaultServerUrlFor(config) if err != nil { return nil, err } transport, err := TransportFor(config) if err != nil { return nil, err } var httpClient *http.Client if transport != http.DefaultTransport { httpClient = &http.Client{Transport: transport} if config.Timeout > 0 { httpClient.Timeout = config.Timeout } } return NewRESTClient(baseURL, versionedAPIPath, config.ContentConfig, qps, burst, config.RateLimiter, httpClient) }
|
在RESTClientFor()中,会使用httpClient = &http.Client{Transport: transport}
生成httpClient,这以后会在Go语言http包分析中会介绍。RestClientFor()最后调用的是NewRESTClient()生成RESTClient。
RestClient
RESTClient可以生成Request,定义在/pkg/client/restclient/client.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| type RESTClient struct { base *url.URL versionedAPIPath string contentConfig ContentConfig serializers Serializers createBackoffMgr func() BackoffManager // TODO extract this into a wrapper interface via the RESTClient interface in kubectl. Throttle flowcontrol.RateLimiter // Set specific behavior of the client. If not set http.DefaultClient will be used. Client *http.Client }
|
NewRESTClient()定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConfig, maxQPS float32, maxBurst int, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) { base := *baseURL if !strings.HasSuffix(base.Path, "/") { base.Path += "/" } base.RawQuery = "" base.Fragment = "" if config.GroupVersion == nil { config.GroupVersion = &unversioned.GroupVersion{} } if len(config.ContentType) == 0 { config.ContentType = "application/json" } serializers, err := createSerializers(config) if err != nil { return nil, err } var throttle flowcontrol.RateLimiter if maxQPS > 0 && rateLimiter == nil { throttle = flowcontrol.NewTokenBucketRateLimiter(maxQPS, maxBurst) } else if rateLimiter != nil { throttle = rateLimiter } return &RESTClient{ base: &base, versionedAPIPath: versionedAPIPath, contentConfig: config, serializers: *serializers, createBackoffMgr: readExpBackoffConfig, Throttle: throttle, Client: client, }, nil }
|
RESTClient可以通过Post(), Put(), Patch(), Get(), Delete()设置请求的动作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| func (c *RESTClient) Post() *Request { return c.Verb("POST") } func (c *RESTClient) Put() *Request { return c.Verb("PUT") } func (c *RESTClient) Patch(pt api.PatchType) *Request { return c.Verb("PATCH").SetHeader("Content-Type", string(pt)) } func (c *RESTClient) Get() *Request { return c.Verb("GET") } func (c *RESTClient) Delete() *Request { return c.Verb("DELETE") }
|
这些方法都调用了Verb()方法:
1 2 3 4 5 6 7 8 9
| func (c *RESTClient) Verb(verb string) *Request { backoff := c.createBackoffMgr() if c.Client == nil { return NewRequest(nil, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle) } return NewRequest(c.Client, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle) }
|
在Verb()中,会调用NewRequest()函数生成Request。
Request
Request可以执行一个具体的请求。Request定义在/pkg/client/restclient/request.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| type Request struct { client HTTPClient verb string baseURL *url.URL content ContentConfig serializers Serializers pathPrefix string subpath string params url.Values headers http.Header namespace string namespaceSet bool resource string resourceName string subresource string selector labels.Selector timeout time.Duration err error body io.Reader req *http.Request resp *http.Response backoffMgr BackoffManager throttle flowcontrol.RateLimiter }
|
Request可以通过NewRequest()生成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, serializers Serializers, backoff BackoffManager, throttle flowcontrol.RateLimiter) *Request { if backoff == nil { glog.V(2).Infof("Not implementing request backoff strategy.") backoff = &NoBackoff{} } pathPrefix := "/" if baseURL != nil { pathPrefix = path.Join(pathPrefix, baseURL.Path) } r := &Request{ client: client, verb: verb, baseURL: baseURL, pathPrefix: path.Join(pathPrefix, versionedAPIPath), content: content, serializers: serializers, backoffMgr: backoff, throttle: throttle, } switch { case len(content.AcceptContentTypes) > 0: r.SetHeader("Accept", content.AcceptContentTypes) case len(content.ContentType) > 0: r.SetHeader("Accept", content.ContentType+", */*") } return r }
|
Request的方法可以分成三类:
- 设置Request结构体字段的方法;
- 生成URL的方法;
- 执行请求的方法。
先来介绍设置字段的方法。
Prefix()
Prefix()可以在原始pathPrefix后增加内容。
1 2 3 4 5 6 7 8
| func (r *Request) Prefix(segments ...string) *Request { if r.err != nil { return r } r.pathPrefix = path.Join(r.pathPrefix, path.Join(segments...)) return r }
|
Suffix()
Suffix()可以在原始subpath后增加内容。
1 2 3 4 5 6 7 8
| func (r *Request) Suffix(segments ...string) *Request { if r.err != nil { return r } r.subpath = path.Join(r.subpath, path.Join(segments...)) return r }
|
Resource()
设置resource字段。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func (r *Request) Resource(resource string) *Request { if r.err != nil { return r } if len(r.resource) != 0 { r.err = fmt.Errorf("resource already set to %q, cannot change to %q", r.resource, resource) return r } if msgs := pathvalidation.IsValidPathSegmentName(resource); len(msgs) != 0 { r.err = fmt.Errorf("invalid resource %q: %v", resource, msgs) return r } r.resource = resource return r }
|
SubResource()
设置subresource字段,subresource,如”/api/v1/namespaces/{namespace}/resourcequotas/{name}/status”,status就是subresource。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| func (r *Request) SubResource(subresources ...string) *Request { if r.err != nil { return r } subresource := path.Join(subresources...) if len(r.subresource) != 0 { r.err = fmt.Errorf("subresource already set to %q, cannot change to %q", r.resource, subresource) return r } for _, s := range subresources { if msgs := pathvalidation.IsValidPathSegmentName(s); len(msgs) != 0 { r.err = fmt.Errorf("invalid subresource %q: %v", s, msgs) return r } } r.subresource = subresource return r }
|
Name()
设置resourceName字段。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| func (r *Request) Name(resourceName string) *Request { if r.err != nil { return r } if len(resourceName) == 0 { r.err = fmt.Errorf("resource name may not be empty") return r } if len(r.resourceName) != 0 { r.err = fmt.Errorf("resource name already set to %q, cannot change to %q", r.resourceName, resourceName) return r } if msgs := pathvalidation.IsValidPathSegmentName(resourceName); len(msgs) != 0 { r.err = fmt.Errorf("invalid resource name %q: %v", resourceName, msgs) return r } r.resourceName = resourceName return r }
|
Namespace()
设置namespaceSet和namespace字段。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| func (r *Request) Namespace(namespace string) *Request { if r.err != nil { return r } if r.namespaceSet { r.err = fmt.Errorf("namespace already set to %q, cannot change to %q", r.namespace, namespace) return r } if msgs := pathvalidation.IsValidPathSegmentName(namespace); len(msgs) != 0 { r.err = fmt.Errorf("invalid namespace %q: %v", namespace, msgs) return r } r.namespaceSet = true r.namespace = namespace return r }
|
FieldsSelectorParam()
处理fields selector。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| func (r *Request) FieldsSelectorParam(s fields.Selector) *Request { if r.err != nil { return r } if s == nil { return r } if s.Empty() { return r } s2, err := s.Transform(func(field, value string) (newField, newValue string, err error) { return fieldMappings.filterField(r.content.GroupVersion, r.resource, field, value) }) if err != nil { r.err = err return r } return r.setParam(unversioned.FieldSelectorQueryParam(r.content.GroupVersion.String()), s2.String()) }
|
LabelsSelectorParam()
处理label selector。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func (r *Request) LabelsSelectorParam(s labels.Selector) *Request { if r.err != nil { return r } if s == nil { return r } if s.Empty() { return r } return r.setParam(unversioned.LabelSelectorQueryParam(r.content.GroupVersion.String()), s.String()) }
|
setParam()
设置params字段。
1 2 3 4 5 6 7 8 9 10 11 12
| func (r *Request) setParam(paramName, value string) *Request { if specialParams.Has(paramName) { r.err = fmt.Errorf("must set %v through the corresponding function, not directly.", paramName) return r } if r.params == nil { r.params = make(url.Values) } r.params[paramName] = append(r.params[paramName], value) return r }
|
设置headers字段。
1 2 3 4 5 6 7 8
| func (r *Request) SetHeader(key, value string) *Request { if r.headers == nil { r.headers = http.Header{} } r.headers.Set(key, value) return r }
|
Timeout()
设置timeout字段。
1 2 3 4 5 6 7 8 9 10
| func (r *Request) Timeout(d time.Duration) *Request { if r.err != nil { return r } r.timeout = d return r }
|
Body()
设置body字段。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| func (r *Request) Body(obj interface{}) *Request { if r.err != nil { return r } switch t := obj.(type) { case string: data, err := ioutil.ReadFile(t) if err != nil { r.err = err return r } glog.V(8).Infof("Request Body: %#v", string(data)) r.body = bytes.NewReader(data) case []byte: glog.V(8).Infof("Request Body: %#v", string(t)) r.body = bytes.NewReader(t) case io.Reader: r.body = t case runtime.Object: if reflect.ValueOf(t).IsNil() { return r } data, err := runtime.Encode(r.serializers.Encoder, t) if err != nil { r.err = err return r } glog.V(8).Infof("Request Body: %#v", string(data)) r.body = bytes.NewReader(data) r.SetHeader("Content-Type", r.content.ContentType) default: r.err = fmt.Errorf("unknown type used for body: %+v", obj) } return r }
|
再来看如何根据这些参数组装URL。
URL()
URL()根据参数返回URL,URL的格式为:/namespaces/namespace/resource/resourceName/subresource/subpath。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| func (r *Request) URL() *url.URL { p := r.pathPrefix if r.namespaceSet && len(r.namespace) > 0 { p = path.Join(p, "namespaces", r.namespace) } if len(r.resource) != 0 { p = path.Join(p, strings.ToLower(r.resource)) } if len(r.resourceName) != 0 || len(r.subpath) != 0 || len(r.subresource) != 0 { p = path.Join(p, r.resourceName, r.subresource, r.subpath) } finalURL := &url.URL{} if r.baseURL != nil { *finalURL = *r.baseURL } finalURL.Path = p query := url.Values{} for key, values := range r.params { for _, value := range values { query.Add(key, value) } } if r.timeout != 0 { query.Set("timeout", r.timeout.String()) } finalURL.RawQuery = query.Encode() return finalURL }
|
再来看Request如何完成请求。
Do()
Do()为Request执行请求的入口。
1 2 3 4 5 6 7 8 9 10 11 12 13
| func (r *Request) Do() Result { r.tryThrottle() var result Result err := r.request(func(req *http.Request, resp *http.Response) { result = r.transformResponse(resp, req) }) if err != nil { return Result{err: err} } return result }
|
Do()通过调用request()方法执行请求。
request()
request()调用了Go语言http包执行请求,在请求执行完后,调用resp.Body.Close()关闭连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
| func (r *Request) request(fn func(*http.Request, *http.Response)) error { start := time.Now() defer func() { metrics.RequestLatency.Observe(r.verb, r.finalURLTemplate(), time.Since(start)) }() if r.err != nil { glog.V(4).Infof("Error in request: %v", r.err) return r.err } if (r.verb == "GET" || r.verb == "PUT" || r.verb == "DELETE") && r.namespaceSet && len(r.resourceName) > 0 && len(r.namespace) == 0 { return fmt.Errorf("an empty namespace may not be set when a resource name is provided") } if (r.verb == "POST") && r.namespaceSet && len(r.namespace) == 0 { return fmt.Errorf("an empty namespace may not be set during creation") } client := r.client if client == nil { client = http.DefaultClient } maxRetries := 10 retries := 0 for { url := r.URL().String() req, err := http.NewRequest(r.verb, url, r.body) if err != nil { return err } req.Header = r.headers r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL())) if retries > 0 { r.tryThrottle() } resp, err := client.Do(req) updateURLMetrics(r, resp, err) if err != nil { r.backoffMgr.UpdateBackoff(r.URL(), err, 0) } else { r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode) } if err != nil { return err } done := func() bool { defer func() { const maxBodySlurpSize = 2 << 10 if resp.ContentLength <= maxBodySlurpSize { io.Copy(ioutil.Discard, &io.LimitedReader{R: resp.Body, N: maxBodySlurpSize}) } resp.Body.Close() }() retries++ if seconds, wait := checkWait(resp); wait && retries < maxRetries { if seeker, ok := r.body.(io.Seeker); ok && r.body != nil { _, err := seeker.Seek(0, 0) if err != nil { glog.V(4).Infof("Could not retry request, can't Seek() back to beginning of body for %T", r.body) fn(req, resp) return true } } glog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", seconds, retries, url) r.backoffMgr.Sleep(time.Duration(seconds) * time.Second) return false } fn(req, resp) return true }() if done { return nil } } }
|
Watch()
Watch入口。Watch()不会主要关闭连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| func (r *Request) Watch() (watch.Interface, error) { if r.err != nil { return nil, r.err } if r.serializers.Framer == nil { return nil, fmt.Errorf("watching resources is not possible with this client (content-type: %s)", r.content.ContentType) } url := r.URL().String() req, err := http.NewRequest(r.verb, url, r.body) if err != nil { return nil, err } req.Header = r.headers client := r.client if client == nil { client = http.DefaultClient } r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL())) resp, err := client.Do(req) updateURLMetrics(r, resp, err) if r.baseURL != nil { if err != nil { r.backoffMgr.UpdateBackoff(r.baseURL, err, 0) } else { r.backoffMgr.UpdateBackoff(r.baseURL, err, resp.StatusCode) } } if err != nil { if net.IsProbableEOF(err) { return watch.NewEmptyWatch(), nil } return nil, err } if resp.StatusCode != http.StatusOK { defer resp.Body.Close() if result := r.transformResponse(resp, req); result.err != nil { return nil, result.err } return nil, fmt.Errorf("for request '%+v', got status: %v", url, resp.StatusCode) } framer := r.serializers.Framer.NewFrameReader(resp.Body) decoder := streaming.NewDecoder(framer, r.serializers.StreamingSerializer) return watch.NewStreamWatcher(versioned.NewDecoder(decoder, r.serializers.Decoder)), nil }
|
关于Watch(),将专门进行讨论。
附录
http包的简单例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| func httpDo() { client := &http.Client{} req, err := http.NewRequest("POST", "http://www.baidu.com", strings.NewReader("name=cjb")) if err != nil { } req.Header.Set("Content-Type", "application/x-www-form-urlencoded") req.Header.Set("Cookie", "name=anny") resp, err := client.Do(req) defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { } fmt.Println(string(body)) }
|
可以看出,在打开了连接后,需要通过resp.Body.Close()进行关闭。