Config

Config中含有apiserver相关的信息,可以生成RESTClient,定义在/pkg/client/restclient/config.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
//***Config包含apiserver相关的信息***//
//***config:&{https://kubernetes:6443 /api {application/json application/json v1 0xc420109240} <nil> <nil> {/etc/kubernetes/security/serverkey/server.crt /etc/kubernetes/security/serverkey/server.key /etc/kubernetes/security/serverkey/ca.crt [] [] []} false kubectl/v1.5.2+08e0995 (linux/amd64) kubernetes/08e0995 <nil> <nil> 0 0 <nil> 0s}***//
type Config struct {
// Host must be a host string, a host:port pair, or a URL to the base of the apiserver.
// If a URL is given then the (optional) Path of that URL represents a prefix that must
// be appended to all request URIs used to access the apiserver. This allows a frontend
// proxy to easily relocate all of the apiserver endpoints.
Host string
// APIPath is a sub-path that points to an API root.
APIPath string
// Prefix is the sub path of the server. If not specified, the client will set
// a default value. Use "/" to indicate the server root should be used
Prefix string
// ContentConfig contains settings that affect how objects are transformed when
// sent to the server.
ContentConfig
// Server requires Basic authentication
Username string
Password string
// Server requires Bearer authentication. This client will not attempt to use
// refresh tokens for an OAuth2 flow.
// TODO: demonstrate an OAuth2 compatible client.
BearerToken string
// Impersonate is the username that this RESTClient will impersonate
Impersonate string
// Server requires plugin-specified authentication.
AuthProvider *clientcmdapi.AuthProviderConfig
// Callback to persist config for AuthProvider.
AuthConfigPersister AuthProviderConfigPersister
// TLSClientConfig contains settings to enable transport layer security
TLSClientConfig
// Server should be accessed without verifying the TLS
// certificate. For testing only.
Insecure bool
// UserAgent is an optional field that specifies the caller of this request.
UserAgent string
// Transport may be used for custom HTTP behavior. This attribute may not
// be specified with the TLS client certificate options. Use WrapTransport
// for most client level operations.
Transport http.RoundTripper
// WrapTransport will be invoked for custom HTTP behavior after the underlying
// transport is initialized (either the transport created from TLSClientConfig,
// Transport, or http.DefaultTransport). The config may layer other RoundTrippers
// on top of the returned RoundTripper.
WrapTransport func(rt http.RoundTripper) http.RoundTripper
// QPS indicates the maximum QPS to the master from this client.
// If it's zero, the created RESTClient will use DefaultQPS: 5
QPS float32
// Maximum burst for throttle.
// If it's zero, the created RESTClient will use DefaultBurst: 10.
Burst int
// Rate limiter for limiting connections to the master from this client. If present overwrites QPS/Burst
RateLimiter flowcontrol.RateLimiter
// The maximum length of time to wait before giving up on a server request. A value of zero means no timeout.
Timeout time.Duration
// Version forces a specific version to be used (if registered)
// Do we need this?
// Version string
}

可以通过调用RESTClientFor()或UnversionedRESTClientFor()来生成restClient,以RESTClientFor()为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//***从config生成RESTClient***//
func RESTClientFor(config *Config) (*RESTClient, error) {
if config.GroupVersion == nil {
return nil, fmt.Errorf("GroupVersion is required when initializing a RESTClient")
}
if config.NegotiatedSerializer == nil {
return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
}
qps := config.QPS
if config.QPS == 0.0 {
qps = DefaultQPS
}
burst := config.Burst
if config.Burst == 0 {
burst = DefaultBurst
}
//***baseURL, versionedAPIPath: https://kubernetes:6443 /api/v1***//
baseURL, versionedAPIPath, err := defaultServerUrlFor(config)
if err != nil {
return nil, err
}
//***https相关***//
transport, err := TransportFor(config)
if err != nil {
return nil, err
}
var httpClient *http.Client
if transport != http.DefaultTransport {
//***生成http client***//
httpClient = &http.Client{Transport: transport}
if config.Timeout > 0 {
httpClient.Timeout = config.Timeout
}
}
//***生成RESTClient***//
return NewRESTClient(baseURL, versionedAPIPath, config.ContentConfig, qps, burst, config.RateLimiter, httpClient)
}

在RESTClientFor()中,会使用httpClient = &http.Client{Transport: transport}生成httpClient,这以后会在Go语言http包分析中会介绍。RestClientFor()最后调用的是NewRESTClient()生成RESTClient。

RestClient

RESTClient可以生成Request,定义在/pkg/client/restclient/client.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type RESTClient struct {
// base is the root URL for all invocations of the client
//***base: {https <nil> kubernetes:6443 / false }***//
base *url.URL
// versionedAPIPath is a path segment connecting the base URL to the resource root
//***versionedAPIPath: /api/v1***//
versionedAPIPath string
// contentConfig is the information used to communicate with the server.
contentConfig ContentConfig
// serializers contain all serializers for underlying content type.
serializers Serializers
// creates BackoffManager that is passed to requests.
createBackoffMgr func() BackoffManager
// TODO extract this into a wrapper interface via the RESTClient interface in kubectl.
Throttle flowcontrol.RateLimiter
// Set specific behavior of the client. If not set http.DefaultClient will be used.
Client *http.Client
}

NewRESTClient()定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//***生成client***//
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConfig, maxQPS float32, maxBurst int, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) {
base := *baseURL
//***确保以"/"结尾***//
if !strings.HasSuffix(base.Path, "/") {
base.Path += "/"
}
base.RawQuery = ""
base.Fragment = ""
if config.GroupVersion == nil {
config.GroupVersion = &unversioned.GroupVersion{}
}
if len(config.ContentType) == 0 {
config.ContentType = "application/json"
}
//***序列化器***//
serializers, err := createSerializers(config)
if err != nil {
return nil, err
}
var throttle flowcontrol.RateLimiter
if maxQPS > 0 && rateLimiter == nil {
throttle = flowcontrol.NewTokenBucketRateLimiter(maxQPS, maxBurst)
} else if rateLimiter != nil {
throttle = rateLimiter
}
//***base, versionedAPIPath: {https <nil> kubernetes:6443 / false } /api/v1***//
return &RESTClient{
base: &base,
versionedAPIPath: versionedAPIPath,
contentConfig: config,
serializers: *serializers,
createBackoffMgr: readExpBackoffConfig,
Throttle: throttle,
Client: client,
}, nil
}

RESTClient可以通过Post(), Put(), Patch(), Get(), Delete()设置请求的动作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Post begins a POST request. Short for c.Verb("POST").
//***设置请求动作为POST***//
func (c *RESTClient) Post() *Request {
return c.Verb("POST")
}
// Put begins a PUT request. Short for c.Verb("PUT").
//***设置请求动作为PUT***//
func (c *RESTClient) Put() *Request {
return c.Verb("PUT")
}
// Patch begins a PATCH request. Short for c.Verb("Patch").
//***设置请求动作为PATCH***//
func (c *RESTClient) Patch(pt api.PatchType) *Request {
return c.Verb("PATCH").SetHeader("Content-Type", string(pt))
}
// Get begins a GET request. Short for c.Verb("GET").
//***设置请求动作为GET***//
func (c *RESTClient) Get() *Request {
return c.Verb("GET")
}
// Delete begins a DELETE request. Short for c.Verb("DELETE").
//***设置请求动作为DELETE***//
func (c *RESTClient) Delete() *Request {
return c.Verb("DELETE")
}

这些方法都调用了Verb()方法:

1
2
3
4
5
6
7
8
9
//***设置请求中的动作***//
func (c *RESTClient) Verb(verb string) *Request {
backoff := c.createBackoffMgr()
if c.Client == nil {
return NewRequest(nil, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle)
}
return NewRequest(c.Client, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle)
}

在Verb()中,会调用NewRequest()函数生成Request。

Request

Request可以执行一个具体的请求。Request定义在/pkg/client/restclient/request.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
type Request struct {
// required
client HTTPClient
verb string
baseURL *url.URL
content ContentConfig
serializers Serializers
// generic components accessible via method setters
pathPrefix string
subpath string
params url.Values
headers http.Header
// structural elements of the request that are part of the Kubernetes API conventions
namespace string
namespaceSet bool
resource string
resourceName string
subresource string
selector labels.Selector
timeout time.Duration
// output
err error
body io.Reader
// The constructed request and the response
req *http.Request
resp *http.Response
backoffMgr BackoffManager
throttle flowcontrol.RateLimiter
}

Request可以通过NewRequest()生成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//***创建Request***//
func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, serializers Serializers, backoff BackoffManager, throttle flowcontrol.RateLimiter) *Request {
if backoff == nil {
glog.V(2).Infof("Not implementing request backoff strategy.")
backoff = &NoBackoff{}
}
pathPrefix := "/"
if baseURL != nil {
pathPrefix = path.Join(pathPrefix, baseURL.Path)
}
r := &Request{
client: client,
verb: verb,
baseURL: baseURL,
pathPrefix: path.Join(pathPrefix, versionedAPIPath),
content: content,
serializers: serializers,
backoffMgr: backoff,
throttle: throttle,
}
switch {
case len(content.AcceptContentTypes) > 0:
r.SetHeader("Accept", content.AcceptContentTypes)
case len(content.ContentType) > 0:
r.SetHeader("Accept", content.ContentType+", */*")
}
return r
}

Request的方法可以分成三类:

  • 设置Request结构体字段的方法;
  • 生成URL的方法;
  • 执行请求的方法。

先来介绍设置字段的方法。

Prefix()

Prefix()可以在原始pathPrefix后增加内容。

1
2
3
4
5
6
7
8
//***设置pathPrefix***//
func (r *Request) Prefix(segments ...string) *Request {
if r.err != nil {
return r
}
r.pathPrefix = path.Join(r.pathPrefix, path.Join(segments...))
return r
}

Suffix()

Suffix()可以在原始subpath后增加内容。

1
2
3
4
5
6
7
8
//***设置subpath***//
func (r *Request) Suffix(segments ...string) *Request {
if r.err != nil {
return r
}
r.subpath = path.Join(r.subpath, path.Join(segments...))
return r
}

Resource()

设置resource字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//***设置resource***//
func (r *Request) Resource(resource string) *Request {
if r.err != nil {
return r
}
if len(r.resource) != 0 {
r.err = fmt.Errorf("resource already set to %q, cannot change to %q", r.resource, resource)
return r
}
if msgs := pathvalidation.IsValidPathSegmentName(resource); len(msgs) != 0 {
r.err = fmt.Errorf("invalid resource %q: %v", resource, msgs)
return r
}
r.resource = resource
return r
}

SubResource()

设置subresource字段,subresource,如”/api/v1/namespaces/{namespace}/resourcequotas/{name}/status”,status就是subresource。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//***设置subresource***//
func (r *Request) SubResource(subresources ...string) *Request {
if r.err != nil {
return r
}
subresource := path.Join(subresources...)
if len(r.subresource) != 0 {
r.err = fmt.Errorf("subresource already set to %q, cannot change to %q", r.resource, subresource)
return r
}
for _, s := range subresources {
if msgs := pathvalidation.IsValidPathSegmentName(s); len(msgs) != 0 {
r.err = fmt.Errorf("invalid subresource %q: %v", s, msgs)
return r
}
}
r.subresource = subresource
return r
}

Name()

设置resourceName字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Name sets the name of a resource to access (<resource>/[ns/<namespace>/]<name>)
//***设置resourceName***//
func (r *Request) Name(resourceName string) *Request {
if r.err != nil {
return r
}
if len(resourceName) == 0 {
r.err = fmt.Errorf("resource name may not be empty")
return r
}
if len(r.resourceName) != 0 {
r.err = fmt.Errorf("resource name already set to %q, cannot change to %q", r.resourceName, resourceName)
return r
}
if msgs := pathvalidation.IsValidPathSegmentName(resourceName); len(msgs) != 0 {
r.err = fmt.Errorf("invalid resource name %q: %v", resourceName, msgs)
return r
}
r.resourceName = resourceName
return r
}

Namespace()

设置namespaceSet和namespace字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Namespace applies the namespace scope to a request (<resource>/[ns/<namespace>/]<name>)
//***设置namespace***//
func (r *Request) Namespace(namespace string) *Request {
if r.err != nil {
return r
}
if r.namespaceSet {
r.err = fmt.Errorf("namespace already set to %q, cannot change to %q", r.namespace, namespace)
return r
}
if msgs := pathvalidation.IsValidPathSegmentName(namespace); len(msgs) != 0 {
r.err = fmt.Errorf("invalid namespace %q: %v", namespace, msgs)
return r
}
r.namespaceSet = true
r.namespace = namespace
return r
}

FieldsSelectorParam()

处理fields selector。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// FieldsSelectorParam adds the given selector as a query parameter with the name paramName.
//***处理fileds selector***//
func (r *Request) FieldsSelectorParam(s fields.Selector) *Request {
if r.err != nil {
return r
}
if s == nil {
return r
}
if s.Empty() {
return r
}
s2, err := s.Transform(func(field, value string) (newField, newValue string, err error) {
return fieldMappings.filterField(r.content.GroupVersion, r.resource, field, value)
})
if err != nil {
r.err = err
return r
}
return r.setParam(unversioned.FieldSelectorQueryParam(r.content.GroupVersion.String()), s2.String())
}

LabelsSelectorParam()

处理label selector。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// LabelsSelectorParam adds the given selector as a query parameter
//***处理label selector***//
func (r *Request) LabelsSelectorParam(s labels.Selector) *Request {
if r.err != nil {
return r
}
if s == nil {
return r
}
if s.Empty() {
return r
}
return r.setParam(unversioned.LabelSelectorQueryParam(r.content.GroupVersion.String()), s.String())
}

setParam()

设置params字段。

1
2
3
4
5
6
7
8
9
10
11
12
//***设置URL参数***//
func (r *Request) setParam(paramName, value string) *Request {
if specialParams.Has(paramName) {
r.err = fmt.Errorf("must set %v through the corresponding function, not directly.", paramName)
return r
}
if r.params == nil {
r.params = make(url.Values)
}
r.params[paramName] = append(r.params[paramName], value)
return r
}

SetHeader()

设置headers字段。

1
2
3
4
5
6
7
8
//***设置头文件***//
func (r *Request) SetHeader(key, value string) *Request {
if r.headers == nil {
r.headers = http.Header{}
}
r.headers.Set(key, value)
return r
}

Timeout()

设置timeout字段。

1
2
3
4
5
6
7
8
9
10
// Timeout makes the request use the given duration as a timeout. Sets the "timeout"
// parameter.
//***设置超时时间***//
func (r *Request) Timeout(d time.Duration) *Request {
if r.err != nil {
return r
}
r.timeout = d
return r
}

Body()

设置body字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//***设置body***//
func (r *Request) Body(obj interface{}) *Request {
if r.err != nil {
return r
}
switch t := obj.(type) {
case string:
data, err := ioutil.ReadFile(t)
if err != nil {
r.err = err
return r
}
glog.V(8).Infof("Request Body: %#v", string(data))
r.body = bytes.NewReader(data)
case []byte:
glog.V(8).Infof("Request Body: %#v", string(t))
r.body = bytes.NewReader(t)
case io.Reader:
r.body = t
case runtime.Object:
// callers may pass typed interface pointers, therefore we must check nil with reflection
if reflect.ValueOf(t).IsNil() {
return r
}
data, err := runtime.Encode(r.serializers.Encoder, t)
if err != nil {
r.err = err
return r
}
glog.V(8).Infof("Request Body: %#v", string(data))
r.body = bytes.NewReader(data)
r.SetHeader("Content-Type", r.content.ContentType)
default:
r.err = fmt.Errorf("unknown type used for body: %+v", obj)
}
return r
}

再来看如何根据这些参数组装URL。

URL()

URL()根据参数返回URL,URL的格式为:/namespaces/namespace/resource/resourceName/subresource/subpath。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// URL returns the current working URL.
//***Fankang***//
//***生成URL***//
//***/namespaces/namespace/resource/resourceName/subresource/subpath***//
//***subresource: /api/v1/namespaces/{namespace}/pods/{name}/log***//
//***subpath: /healthz这种***//
func (r *Request) URL() *url.URL {
p := r.pathPrefix
if r.namespaceSet && len(r.namespace) > 0 {
p = path.Join(p, "namespaces", r.namespace)
}
if len(r.resource) != 0 {
p = path.Join(p, strings.ToLower(r.resource))
}
// Join trims trailing slashes, so preserve r.pathPrefix's trailing slash for backwards compatibility if nothing was changed
if len(r.resourceName) != 0 || len(r.subpath) != 0 || len(r.subresource) != 0 {
p = path.Join(p, r.resourceName, r.subresource, r.subpath)
}
finalURL := &url.URL{}
if r.baseURL != nil {
*finalURL = *r.baseURL
}
finalURL.Path = p
query := url.Values{}
for key, values := range r.params {
for _, value := range values {
query.Add(key, value)
}
}
// timeout is handled specially here.
if r.timeout != 0 {
query.Set("timeout", r.timeout.String())
}
finalURL.RawQuery = query.Encode()
return finalURL
}

再来看Request如何完成请求。

Do()

Do()为Request执行请求的入口。

1
2
3
4
5
6
7
8
9
10
11
12
13
//***执行请求***//
func (r *Request) Do() Result {
r.tryThrottle()
var result Result
err := r.request(func(req *http.Request, resp *http.Response) {
result = r.transformResponse(resp, req)
})
if err != nil {
return Result{err: err}
}
return result
}

Do()通过调用request()方法执行请求。

request()

request()调用了Go语言http包执行请求,在请求执行完后,调用resp.Body.Close()关闭连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
//***真正执行请求的函数***//
func (r *Request) request(fn func(*http.Request, *http.Response)) error {
//Metrics for total request latency
start := time.Now()
defer func() {
metrics.RequestLatency.Observe(r.verb, r.finalURLTemplate(), time.Since(start))
}()
if r.err != nil {
glog.V(4).Infof("Error in request: %v", r.err)
return r.err
}
// TODO: added to catch programmer errors (invoking operations with an object with an empty namespace)
if (r.verb == "GET" || r.verb == "PUT" || r.verb == "DELETE") && r.namespaceSet && len(r.resourceName) > 0 && len(r.namespace) == 0 {
return fmt.Errorf("an empty namespace may not be set when a resource name is provided")
}
if (r.verb == "POST") && r.namespaceSet && len(r.namespace) == 0 {
return fmt.Errorf("an empty namespace may not be set during creation")
}
client := r.client
if client == nil {
client = http.DefaultClient
}
// Right now we make about ten retry attempts if we get a Retry-After response.
// TODO: Change to a timeout based approach.
maxRetries := 10
retries := 0
for {
//***Fankang***//
//***组装url***//
url := r.URL().String()
//***Fankang***//
//***构造request***//
req, err := http.NewRequest(r.verb, url, r.body)
if err != nil {
return err
}
req.Header = r.headers
r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
if retries > 0 {
// We are retrying the request that we already send to apiserver
// at least once before.
// This request should also be throttled with the client-internal throttler.
r.tryThrottle()
}
resp, err := client.Do(req)
updateURLMetrics(r, resp, err)
if err != nil {
r.backoffMgr.UpdateBackoff(r.URL(), err, 0)
} else {
r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode)
}
if err != nil {
return err
}
done := func() bool {
// Ensure the response body is fully read and closed
// before we reconnect, so that we reuse the same TCP
// connection.
defer func() {
const maxBodySlurpSize = 2 << 10
if resp.ContentLength <= maxBodySlurpSize {
io.Copy(ioutil.Discard, &io.LimitedReader{R: resp.Body, N: maxBodySlurpSize})
}
resp.Body.Close()
}()
retries++
if seconds, wait := checkWait(resp); wait && retries < maxRetries {
if seeker, ok := r.body.(io.Seeker); ok && r.body != nil {
_, err := seeker.Seek(0, 0)
if err != nil {
glog.V(4).Infof("Could not retry request, can't Seek() back to beginning of body for %T", r.body)
fn(req, resp)
return true
}
}
glog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", seconds, retries, url)
r.backoffMgr.Sleep(time.Duration(seconds) * time.Second)
return false
}
fn(req, resp)
return true
}()
if done {
return nil
}
}
}

Watch()

Watch入口。Watch()不会主要关闭连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// Watch attempts to begin watching the requested location.
// Returns a watch.Interface, or an error.
//***Watch实现***//
func (r *Request) Watch() (watch.Interface, error) {
// We specifically don't want to rate limit watches, so we
// don't use r.throttle here.
if r.err != nil {
return nil, r.err
}
if r.serializers.Framer == nil {
return nil, fmt.Errorf("watching resources is not possible with this client (content-type: %s)", r.content.ContentType)
}
url := r.URL().String()
req, err := http.NewRequest(r.verb, url, r.body)
if err != nil {
return nil, err
}
req.Header = r.headers
client := r.client
if client == nil {
client = http.DefaultClient
}
r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
resp, err := client.Do(req)
updateURLMetrics(r, resp, err)
if r.baseURL != nil {
if err != nil {
r.backoffMgr.UpdateBackoff(r.baseURL, err, 0)
} else {
r.backoffMgr.UpdateBackoff(r.baseURL, err, resp.StatusCode)
}
}
if err != nil {
// The watch stream mechanism handles many common partial data errors, so closed
// connections can be retried in many cases.
if net.IsProbableEOF(err) {
return watch.NewEmptyWatch(), nil
}
return nil, err
}
if resp.StatusCode != http.StatusOK {
defer resp.Body.Close()
if result := r.transformResponse(resp, req); result.err != nil {
return nil, result.err
}
return nil, fmt.Errorf("for request '%+v', got status: %v", url, resp.StatusCode)
}
framer := r.serializers.Framer.NewFrameReader(resp.Body)
decoder := streaming.NewDecoder(framer, r.serializers.StreamingSerializer)
//***返回的是StreamWatcher***//
return watch.NewStreamWatcher(versioned.NewDecoder(decoder, r.serializers.Decoder)), nil
}

关于Watch(),将专门进行讨论。

附录

http包的简单例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func httpDo() {
client := &http.Client{}
req, err := http.NewRequest("POST", "http://www.baidu.com", strings.NewReader("name=cjb"))
if err != nil {
// handle error
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
req.Header.Set("Cookie", "name=anny")
resp, err := client.Do(req)
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
// handle error
}
fmt.Println(string(body))
}

可以看出,在打开了连接后,需要通过resp.Body.Close()进行关闭。