我们都知道,在Kubernetes中,Apiserver提供了API访问服务。那么API是什么时候注册到Apiserver中的呢?本次分析,将会介绍这些内容。

emicklei/go-restful

Kubernetes使用emicklei/go-restful包提供RESTful API服务。所以有必要先来了解下emicklei/go-restful是如何使用的。

  1. 创建container
  2. 创建ws
  3. 生成route,即ws.GET(“/hello”).To(hello)
  4. ws.Route(route)
  5. container.Add(ws)
  6. http.Server{}

即如下代码形式:

1
2
3
4
5
6
7
8
9
10
container := restful.NewContainer()
ws := new(restful.WebService)
ws.Route(ws.GET("/hello").To(hello))
container.Add(ws)
server := &http.Server{Addr: ":8081", Handler: container}
log.Fatal(server.ListenAndServe())
func hello(req *restful.Request, resp *restful.Response) {
    io.WriteString(resp, "default world")
}

container的生成

所以,我们第一件事就是寻找在哪生成的container。container的生成函数定义在/pkg/genericapiserver/mux/container.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// APIContainer is a restful container which in addition support registering
// handlers that do not show up in swagger or in /
type APIContainer struct {
*restful.Container
NonSwaggerRoutes PathRecorderMux
SecretRoutes Mux
}
// NewAPIContainer constructs a new container for APIs
func NewAPIContainer(mux *http.ServeMux, s runtime.NegotiatedSerializer) *APIContainer {
c := APIContainer{
Container: restful.NewContainer(),
NonSwaggerRoutes: PathRecorderMux{
mux: mux,
},
SecretRoutes: mux,
}
c.Container.ServeMux = mux
c.Container.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
apiserver.InstallRecoverHandler(s, c.Container)
apiserver.InstallServiceErrorHandler(s, c.Container)
return &c
}

container.go中的APIContainer内嵌了restful.Container。那么是哪调用了NewAPIContainer呢?是在/pkg/genericapiserver/config.go的completedConfig结构体的New()中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
//***生成GenericAPIServer***//
func (c completedConfig) New() (*GenericAPIServer, error) {
if c.Serializer == nil {
return nil, fmt.Errorf("Genericapiserver.New() called with config.Serializer == nil")
}
s := &GenericAPIServer{
discoveryAddresses: c.DiscoveryAddresses,
LoopbackClientConfig: c.LoopbackClientConfig,
legacyAPIGroupPrefixes: c.LegacyAPIGroupPrefixes,
admissionControl: c.AdmissionControl,
requestContextMapper: c.RequestContextMapper,
Serializer: c.Serializer,
minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
enableSwaggerSupport: c.EnableSwaggerSupport,
SecureServingInfo: c.SecureServingInfo,
InsecureServingInfo: c.InsecureServingInfo,
ExternalAddress: c.ExternalAddress,
apiGroupsForDiscovery: map[string]unversioned.APIGroup{},
enableOpenAPISupport: c.EnableOpenAPISupport,
openAPIConfig: c.OpenAPIConfig,
postStartHooks: map[string]postStartHookEntry{},
}
//***Fankang***//
//***生成restful的container***//
s.HandlerContainer = mux.NewAPIContainer(http.NewServeMux(), c.Serializer)
//***Fankang***//
//***安装特殊功能的路径***//
s.installAPI(c.Config)
//***Fankang***//
//***调用BuildHandlerChainsFunc()生成Handler***//
//***把HandlerContainer进一步封装,以进行认证,授权等动作***//
s.Handler, s.InsecureHandler = c.BuildHandlerChainsFunc(s.HandlerContainer.ServeMux, c.Config)
return s, nil
}

所以container对应的就是GenericAPIServer的HandlerContainer字段。

API安装入口

我们接着往下分析。先来看API安装入口,即/pkg/master/master.go中completedConfig的New()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
//***从config中生成master***//
func (c completedConfig) New() (*Master, error) {
if reflect.DeepEqual(c.KubeletClientConfig, kubeletclient.KubeletClientConfig{}) {
return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig")
}
//***通过generic config生成GenericAPIServer***//
s, err := c.Config.GenericConfig.SkipComplete().New() // completion is done in Complete, no need for a second time
if err != nil {
return nil, err
}
//***UI redirect功能***//
if c.EnableUISupport {
routes.UIRedirect{}.Install(s.HandlerContainer)
}
//***log相关***//
if c.EnableLogsSupport {
routes.Logs{}.Install(s.HandlerContainer)
}
m := &Master{
GenericAPIServer: s,
}
//***生成restOptionsFactory***//
restOptionsFactory := restOptionsFactory{
deleteCollectionWorkers: c.DeleteCollectionWorkers,
enableGarbageCollection: c.GenericConfig.EnableGarbageCollection,
storageFactory: c.StorageFactory,
}
//***赋值storageDecorator***//
if c.EnableWatchCache {
restOptionsFactory.storageDecorator = registry.StorageWithCacher
} else {
restOptionsFactory.storageDecorator = generic.UndecoratedStorage
}
// install legacy rest storage
if c.GenericConfig.APIResourceConfigSource.AnyResourcesForVersionEnabled(apiv1.SchemeGroupVersion) {
legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
StorageFactory: c.StorageFactory,
ProxyTransport: c.ProxyTransport,
KubeletClientConfig: c.KubeletClientConfig,
EventTTL: c.EventTTL,
ServiceIPRange: c.ServiceIPRange,
ServiceNodePortRange: c.ServiceNodePortRange,
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
}
//***调用InstallLegacyAPI()***//
m.InstallLegacyAPI(c.Config, restOptionsFactory.NewFor, legacyRESTStorageProvider)
}
//***构造restStorageProvider数组***//
restStorageProviders := []genericapiserver.RESTStorageProvider{
appsrest.RESTStorageProvider{},
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authenticator},
authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorizer},
autoscalingrest.RESTStorageProvider{},
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{},
//***extension包***//
extensionsrest.RESTStorageProvider{ResourceInterface: thirdparty.NewThirdPartyResourceServer(s, c.StorageFactory)},
policyrest.RESTStorageProvider{},
rbacrest.RESTStorageProvider{AuthorizerRBACSuperUser: c.GenericConfig.AuthorizerRBACSuperUser},
storagerest.RESTStorageProvider{},
}
//***调用InstallAPIs()***//
m.InstallAPIs(c.Config.GenericConfig.APIResourceConfigSource, restOptionsFactory.NewFor, restStorageProviders...)
if c.Tunneler != nil {
m.installTunneler(c.Tunneler, coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes())
}
return m, nil
}

可以看出,New()方法调用了InstallLegacyAPI()和InstallAPIs()安装API。

InstallLegacyAPI()

InstallLegacyAPI()就是安装Kubernetes一些传统核心的API,定义在/pkg/master/master.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (m *Master) InstallLegacyAPI(c *Config, restOptionsGetter genericapiserver.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) {
//***调用LegacyRESTStorageProvider(),定义在/pkg/registry/core/rest/storage_core.go中***//
legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
if err != nil {
glog.Fatalf("Error building core storage: %v", err)
}
if c.EnableCoreControllers {
serviceClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
bootstrapController := c.NewBootstrapController(legacyRESTStorage, serviceClient)
if err := m.GenericAPIServer.AddPostStartHook("bootstrap-controller", bootstrapController.PostStartHook); err != nil {
glog.Fatalf("Error registering PostStartHook %q: %v", "bootstrap-controller", err)
}
}
//***安装legacyRESTStorage***//
//***DefaultLegacyAPIPrefix = "/api"***//
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
glog.Fatalf("Error in registering group versions: %v", err)
}
}

InstallLegacyAPI()先通过NewLegacyRESTStorage()生成apiGroupInfo,然后通过GenericAPIServer的InstallLegacyAPIGroup()安装apiGroupInfo。关于apiGroupInfo及InstallLegacyAPIGroup(),稍后介绍。

NewLegacyRESTStorage()定义在/pkg/registry/core/rest/storage_core.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter genericapiserver.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
//***构造APIGroupInfo***//
apiGroupInfo := genericapiserver.APIGroupInfo{
//***从registered中取出GroupName对应的GroupMeta***//
GroupMeta: *registered.GroupOrDie(api.GroupName),
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
Scheme: api.Scheme,
ParameterCodec: api.ParameterCodec,
NegotiatedSerializer: api.Codecs,
SubresourceGroupVersionKind: map[string]unversioned.GroupVersionKind{},
}
if autoscalingGroupVersion := (unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}); registered.IsEnabledVersion(autoscalingGroupVersion) {
apiGroupInfo.SubresourceGroupVersionKind["replicationcontrollers/scale"] = autoscalingGroupVersion.WithKind("Scale")
}
var podDisruptionClient policyclient.PodDisruptionBudgetsGetter
if policyGroupVersion := (unversioned.GroupVersion{Group: "policy", Version: "v1beta1"}); registered.IsEnabledVersion(policyGroupVersion) {
apiGroupInfo.SubresourceGroupVersionKind["pods/eviction"] = policyGroupVersion.WithKind("Eviction")
var err error
podDisruptionClient, err = policyclient.NewForConfig(c.LoopbackClientConfig)
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
}
restStorage := LegacyRESTStorage{}
podTemplateStorage := podtemplateetcd.NewREST(restOptionsGetter(api.Resource("podTemplates")))
eventStorage := eventetcd.NewREST(restOptionsGetter(api.Resource("events")), uint64(c.EventTTL.Seconds()))
limitRangeStorage := limitrangeetcd.NewREST(restOptionsGetter(api.Resource("limitRanges")))
resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotaetcd.NewREST(restOptionsGetter(api.Resource("resourceQuotas")))
secretStorage := secretetcd.NewREST(restOptionsGetter(api.Resource("secrets")))
serviceAccountStorage := serviceaccountetcd.NewREST(restOptionsGetter(api.Resource("serviceAccounts")))
persistentVolumeStorage, persistentVolumeStatusStorage := pvetcd.NewREST(restOptionsGetter(api.Resource("persistentVolumes")))
persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage := pvcetcd.NewREST(restOptionsGetter(api.Resource("persistentVolumeClaims")))
configMapStorage := configmapetcd.NewREST(restOptionsGetter(api.Resource("configMaps")))
namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(restOptionsGetter(api.Resource("namespaces")))
restStorage.NamespaceRegistry = namespace.NewRegistry(namespaceStorage)
endpointsStorage := endpointsetcd.NewREST(restOptionsGetter(api.Resource("endpoints")))
restStorage.EndpointRegistry = endpoint.NewRegistry(endpointsStorage)
//***创建nodeStorage***//
//***restOptionsGetter实际为/pkg/master/master.go中的restOptionsFactory类的NewFor()***//
//***api.Resource()定义在/pkg/api/registry.go中***//
nodeStorage, err := nodeetcd.NewStorage(restOptionsGetter(api.Resource("nodes")), c.KubeletClientConfig, c.ProxyTransport)
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
//***创建NodeRegistry***//
//***从这可知,nodeStorage.Node即NodeRegistry中的storage***//
restStorage.NodeRegistry = node.NewRegistry(nodeStorage.Node)
podStorage := podetcd.NewStorage(
restOptionsGetter(api.Resource("pods")),
nodeStorage.KubeletConnectionInfo,
c.ProxyTransport,
podDisruptionClient,
)
serviceRESTStorage, serviceStatusStorage := serviceetcd.NewREST(restOptionsGetter(api.Resource("services")))
restStorage.ServiceRegistry = service.NewRegistry(serviceRESTStorage)
var serviceClusterIPRegistry rangeallocation.RangeRegistry
serviceClusterIPRange := c.ServiceIPRange
if serviceClusterIPRange.IP == nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("service clusterIPRange is missing")
}
serviceStorageConfig, err := c.StorageFactory.NewConfig(api.Resource("services"))
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
ServiceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(&serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface {
mem := allocator.NewAllocationMap(max, rangeSpec)
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), serviceStorageConfig)
serviceClusterIPRegistry = etcd
return etcd
})
restStorage.ServiceClusterIPAllocator = serviceClusterIPRegistry
var serviceNodePortRegistry rangeallocation.RangeRegistry
ServiceNodePortAllocator := portallocator.NewPortAllocatorCustom(c.ServiceNodePortRange, func(max int, rangeSpec string) allocator.Interface {
mem := allocator.NewAllocationMap(max, rangeSpec)
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), serviceStorageConfig)
serviceNodePortRegistry = etcd
return etcd
})
restStorage.ServiceNodePortAllocator = serviceNodePortRegistry
controllerStorage := controlleretcd.NewStorage(restOptionsGetter(api.Resource("replicationControllers")))
serviceRest := service.NewStorage(restStorage.ServiceRegistry, restStorage.EndpointRegistry, ServiceClusterIPAllocator, ServiceNodePortAllocator, c.ProxyTransport)
//***构造restStorageMap***//
//***restStorageMap中记录了路径和storage的映射关系***//
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
"pods/attach": podStorage.Attach,
"pods/status": podStorage.Status,
"pods/log": podStorage.Log,
"pods/exec": podStorage.Exec,
"pods/portforward": podStorage.PortForward,
"pods/proxy": podStorage.Proxy,
"pods/binding": podStorage.Binding,
"bindings": podStorage.Binding,
"podTemplates": podTemplateStorage,
"replicationControllers": controllerStorage.Controller,
"replicationControllers/status": controllerStorage.Status,
"services": serviceRest.Service,
"services/proxy": serviceRest.Proxy,
"services/status": serviceStatusStorage,
"endpoints": endpointsStorage,
"nodes": nodeStorage.Node,
"nodes/status": nodeStorage.Status,
"nodes/proxy": nodeStorage.Proxy,
"events": eventStorage,
"limitRanges": limitRangeStorage,
"resourceQuotas": resourceQuotaStorage,
"resourceQuotas/status": resourceQuotaStatusStorage,
"namespaces": namespaceStorage,
"namespaces/status": namespaceStatusStorage,
"namespaces/finalize": namespaceFinalizeStorage,
"secrets": secretStorage,
"serviceAccounts": serviceAccountStorage,
"persistentVolumes": persistentVolumeStorage,
"persistentVolumes/status": persistentVolumeStatusStorage,
"persistentVolumeClaims": persistentVolumeClaimStorage,
"persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,
"configMaps": configMapStorage,
"componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate),
}
if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}) {
restStorageMap["replicationControllers/scale"] = controllerStorage.Scale
}
if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "policy", Version: "v1beta1"}) {
restStorageMap["pods/eviction"] = podStorage.Eviction
}
//***Fankang***//
//***生成VersionedResourcesStorageMap***//
//***VersionResourcesStorageMap中记录了版本和restStorageMap的映射关系***//
apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap
return restStorage, apiGroupInfo, nil
}

InstallAPIs()

InstallAPIs()是用来安装第三方api的。包含:apps/v1beta1, authentication.k8s.io/v1beta1, authorization.k8s.io/v1beta1, autoscaling/v1, batch/v1, certificates.k8s.io/v1alpha1, extensions/v1beta1, policy/v1beta1, rbac.authorization.k8s.io/v1alpha1, storage.k8s.io/v1beta1, apps/v1beta1。
InstallAPIs()定义在/pkg/master/master.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// InstallAPIs will install the APIs for the restStorageProviders if they are enabled.
func (m *Master) InstallAPIs(apiResourceConfigSource genericapiserver.APIResourceConfigSource, restOptionsGetter genericapiserver.RESTOptionsGetter, restStorageProviders ...genericapiserver.RESTStorageProvider) {
apiGroupsInfo := []genericapiserver.APIGroupInfo{}
for _, restStorageBuilder := range restStorageProviders {
groupName := restStorageBuilder.GroupName()
if !apiResourceConfigSource.AnyResourcesForGroupEnabled(groupName) {
glog.V(1).Infof("Skipping disabled API group %q.", groupName)
continue
}
//***此处调用NewRESTStorage()生成apiGroupInfo***//
//***apiGroupInfo主要包含apps/v1beta1, authentication.k8s.io/v1beta1, authorization.k8s.io/v1beta1***//
//***autoscaling/v1, batch/v1, certificates.k8s.io/v1alpha1, extensions/v1beta1, policy/v1beta1***//
//***rbac.authorization.k8s.io/v1alpha1, storage.k8s.io/v1beta1, apps/v1beta1***//
apiGroupInfo, enabled := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
if !enabled {
glog.Warningf("Problem initializing API group %q, skipping.", groupName)
continue
}
glog.V(1).Infof("Enabling API group %q.", groupName)
if postHookProvider, ok := restStorageBuilder.(genericapiserver.PostStartHookProvider); ok {
name, hook, err := postHookProvider.PostStartHook()
if err != nil {
glog.Fatalf("Error building PostStartHook: %v", err)
}
if err := m.GenericAPIServer.AddPostStartHook(name, hook); err != nil {
glog.Fatalf("Error registering PostStartHook %q: %v", name, err)
}
}
apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo)
}
for i := range apiGroupsInfo {
//***安装路由入口***//
//***{{policy/v1beta1 [policy/v1beta1] <nil> {} DefaultRESTMapper......***//
if err := m.GenericAPIServer.InstallAPIGroup(&apiGroupsInfo[i]); err != nil {
glog.Fatalf("Error in registering group versions: %v", err)
}
}
}

可以看出InstallAPIs()是通过NewRESTStorage()来生成apiGroupInfo,然后通过GenericAPIServer.InstallAPIGroup()安装apiGroupInfo。来看NewRESTStorage(),以/pkg/registry/rbac/rest/storage_rbac.go为例:

1
2
3
4
5
6
7
8
9
10
func (p RESTStorageProvider) NewRESTStorage(apiResourceConfigSource genericapiserver.APIResourceConfigSource, restOptionsGetter genericapiserver.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool) {
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(rbac.GroupName)
if apiResourceConfigSource.AnyResourcesForVersionEnabled(rbacapiv1alpha1.SchemeGroupVersion) {
apiGroupInfo.VersionedResourcesStorageMap[rbacapiv1alpha1.SchemeGroupVersion.Version] = p.v1alpha1Storage(apiResourceConfigSource, restOptionsGetter)
apiGroupInfo.GroupMeta.GroupVersion = rbacapiv1alpha1.SchemeGroupVersion
}
return apiGroupInfo, true
}

NewRESTStorage()一般调用GeneticAPIServer的NewDefaultAPIGroupInfo()来生成apiGroupInfo。

所以来看下NewDefaultAPIGroupInfo(),定义在/pkg/genericapiserver/genericapiserver.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// NewDefaultAPIGroupInfo returns an APIGroupInfo stubbed with "normal" values
// exposed for easier composition from other packages
func NewDefaultAPIGroupInfo(group string) APIGroupInfo {
//***此处使用registered模块***//
groupMeta := registered.GroupOrDie(group)
return APIGroupInfo{
GroupMeta: *groupMeta,
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
OptionsExternalVersion: &registered.GroupOrDie(api.GroupName).GroupVersion,
Scheme: api.Scheme,
ParameterCodec: api.ParameterCodec,
NegotiatedSerializer: api.Codecs,
}
}

可以看出,要构造APIGroupInfo,都会从registered模块中获取到group对应的groupMeta,groupMeta包含mapper等group级别的有用信息。

APIGroupInfo

APIGroupInfo描述Group的相关信息及该Group下的API。APIGroupInfo定义在/pkg/genericapiserver/genericapiserver.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//***网上的例子***//
//apiGroupInfo := apiserver.NeDefaultAPIGroupInfo(... Scheme)
//v11alpha1storage := map[string]rest.Storage()
//v1alpha1storage["testobj"] = NewREST()
//apiGroupInfo.VersionedResourceStorageMap["v1alpha1"] = v1alpha1storage
//GenericAPIServer.InstallAPIGroup(&apiGroupInfo).Run()
type APIGroupInfo struct {
GroupMeta apimachinery.GroupMeta
// Info about the resources in this group. Its a map from version to resource to the storage.
//***在getAPIGroupVersion()中会把该map转化成APIGroupVersion的storage***//
//***在/pkg/registry/core/rest/storage_core.go NewLegacyRESTStorage()中生成***//
VersionedResourcesStorageMap map[string]map[string]rest.Storage
// OptionsExternalVersion controls the APIVersion used for common objects in the
// schema like api.Status, api.DeleteOptions, and api.ListOptions. Other implementors may
// define a version "v1beta1" but want to use the Kubernetes "v1" internal objects.
// If nil, defaults to groupMeta.GroupVersion.
// TODO: Remove this when https://github.com/kubernetes/kubernetes/issues/19018 is fixed.
OptionsExternalVersion *unversioned.GroupVersion
// Scheme includes all of the types used by this group and how to convert between them (or
// to convert objects from outside of this group that are accepted in this API).
// TODO: replace with interfaces
Scheme *runtime.Scheme
// NegotiatedSerializer controls how this group encodes and decodes data
NegotiatedSerializer runtime.NegotiatedSerializer
// ParameterCodec performs conversions for query parameters passed to API calls
ParameterCodec runtime.ParameterCodec
// SubresourceGroupVersionKind contains the GroupVersionKind overrides for each subresource that is
// accessible from this API group version. The GroupVersionKind is that of the external version of
// the subresource. The key of this map should be the path of the subresource. The keys here should
// match the keys in the Storage map above for subresources.
SubresourceGroupVersionKind map[string]unversioned.GroupVersionKind
}

APIGroupInfo中与API最密切的就是VersionedResourcesStorageMap字段,其第一个string表示版本,每二个string表示路径,rest.Storage表示访问对应的存储。

再来看下APIGroupInfo的安装函数InstallAPIGroup(),定义在/pkg/genericapiserver/genericapiserver.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
//***对installAPIResources()的封装***//
//***该函数在master.go中有调用***//
func (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo) error {
// Do not register empty group or empty version. Doing so claims /apis/ for the wrong entity to be returned.
// Catching these here places the error much closer to its origin
if len(apiGroupInfo.GroupMeta.GroupVersion.Group) == 0 {
return fmt.Errorf("cannot register handler with an empty group for %#v", *apiGroupInfo)
}
if len(apiGroupInfo.GroupMeta.GroupVersion.Version) == 0 {
return fmt.Errorf("cannot register handler with an empty version for %#v", *apiGroupInfo)
}
//***APIGroupPrefix = "/apis***//
//***调用installAPIResources()函数***//
if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo); err != nil {
return err
}
// setup discovery
// Install the version handler.
// Add a handler at /apis/<groupName> to enumerate all versions supported by this group.
apiVersionsForDiscovery := []unversioned.GroupVersionForDiscovery{}
for _, groupVersion := range apiGroupInfo.GroupMeta.GroupVersions {
// Check the config to make sure that we elide versions that don't have any resources
if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 {
continue
}
apiVersionsForDiscovery = append(apiVersionsForDiscovery, unversioned.GroupVersionForDiscovery{
GroupVersion: groupVersion.String(),
Version: groupVersion.Version,
})
}
preferedVersionForDiscovery := unversioned.GroupVersionForDiscovery{
GroupVersion: apiGroupInfo.GroupMeta.GroupVersion.String(),
Version: apiGroupInfo.GroupMeta.GroupVersion.Version,
}
apiGroup := unversioned.APIGroup{
Name: apiGroupInfo.GroupMeta.GroupVersion.Group,
Versions: apiVersionsForDiscovery,
PreferredVersion: preferedVersionForDiscovery,
}
s.AddAPIGroupForDiscovery(apiGroup)
s.HandlerContainer.Add(apiserver.NewGroupWebService(s.Serializer, APIGroupPrefix+"/"+apiGroup.Name, apiGroup))
return nil
}

InstallAPIGroup()主要调用的是installAPIResources()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// installAPIResources is a private method for installing the REST storage backing each api groupversionresource
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
for _, groupVersion := range apiGroupInfo.GroupMeta.GroupVersions {
//***先构造apiGroupVersion***//
//***可以通过apiGroupVersion的InstallREST()函数完成路由的注册***//
apiGroupVersion, err := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
if err != nil {
return err
}
if apiGroupInfo.OptionsExternalVersion != nil {
apiGroupVersion.OptionsExternalVersion = apiGroupInfo.OptionsExternalVersion
}
//***APIGroupVersion定义在/pkg/apiserver/apiserver.go中***//
//***InstallREST()会生成一个webservice,并注册好路由,最后把webservice加入到container中***//
if err := apiGroupVersion.InstallREST(s.HandlerContainer.Container); err != nil {
return fmt.Errorf("Unable to setup API %v: %v", apiGroupInfo, err)
}
}
return nil
}

可以看出,installAPIResources()会轮询GroupMeta下的GroupVersion,然后通过getAPIGroupVersion()把APIGroupInfo, groupVerison等构造成apiGroupVersion,然后使用apiGroupVersion.InstallREST(s.HandlerContainer.Container)注册到HandlerContainer中。

getAPIGroupVersion()定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//***构造apiGroupVersion***//
func (s *GenericAPIServer) getAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupVersion unversioned.GroupVersion, apiPrefix string) (*apiserver.APIGroupVersion, error) {
storage := make(map[string]rest.Storage)
//***FanKang***//
//***version中的storage来自apiGroupInfo的VersionedResourcesStorageMap***//
//***k,v: namespaces &{0xc420814d20 0xc420814e10}***//
for k, v := range apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version] {
storage[strings.ToLower(k)] = v
}
version, err := s.newAPIGroupVersion(apiGroupInfo, groupVersion)
version.Root = apiPrefix
version.Storage = storage
return version, err
}

所以,APIGroupVersion又是什么,接下来分析。

APIGroupVersion

APIGroupVersion用来构建路由,定义在/pkg/apiserver/apiserver.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
//***构建webservice路由用***//
type APIGroupVersion struct {
Storage map[string]rest.Storage
Root string
// GroupVersion is the external group version
GroupVersion unversioned.GroupVersion
// OptionsExternalVersion controls the Kubernetes APIVersion used for common objects in the apiserver
// schema like api.Status, api.DeleteOptions, and api.ListOptions. Other implementors may
// define a version "v1beta1" but want to use the Kubernetes "v1" internal objects. If
// empty, defaults to GroupVersion.
OptionsExternalVersion *unversioned.GroupVersion
Mapper meta.RESTMapper
// Serializer is used to determine how to convert responses from API methods into bytes to send over
// the wire.
Serializer runtime.NegotiatedSerializer
ParameterCodec runtime.ParameterCodec
Typer runtime.ObjectTyper
Creater runtime.ObjectCreater
Convertor runtime.ObjectConvertor
Copier runtime.ObjectCopier
Linker runtime.SelfLinker
Admit admission.Interface
Context api.RequestContextMapper
MinRequestTimeout time.Duration
// SubresourceGroupVersionKind contains the GroupVersionKind overrides for each subresource that is
// accessible from this API group version. The GroupVersionKind is that of the external version of
// the subresource. The key of this map should be the path of the subresource. The keys here should
// match the keys in the Storage map above for subresources.
SubresourceGroupVersionKind map[string]unversioned.GroupVersionKind
// ResourceLister is an interface that knows how to list resources
// for this API Group.
ResourceLister APIResourceLister
}

APIGroupVersion结构体的Storage存储了路径和存储的关系;root即apiPrefix,为/api或/apis;GroupVersion字段存储了GV信息。来看下APIGroupVersion的注册函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
// It is expected that the provided path root prefix will serve all operations. Root MUST NOT end
// in a slash.
func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
installer := g.newInstaller()
//***生成webservice***//
ws := installer.NewWebService()
//***调用installer的Install()函数***//
apiResources, registrationErrors := installer.Install(ws)
lister := g.ResourceLister
if lister == nil {
lister = staticLister{apiResources}
}
AddSupportedResourcesWebService(g.Serializer, ws, g.GroupVersion, lister)
//***把webservice加入到container中***//
container.Add(ws)
return utilerrors.NewAggregate(registrationErrors)
}

InstallREST()先生成一个webservice,然后通过installer.Install(ws)把API安装到该websevice中,最后把webservice加入到container中。
再来看下newInstaller():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//***生成一个新的Installer***//
func (g *APIGroupVersion) newInstaller() *APIInstaller {
//***构建prefix: Root + Group + version***//
//In /pkg/apiserver/apiserver.go newInstaller() prefix: /api/v1
//In /pkg/apiserver/apiserver.go newInstaller() prefix: /apis/apps/v1beta1
//In /pkg/apiserver/apiserver.go newInstaller() prefix: /apis/authentication.k8s.io/v1beta1
//In /pkg/apiserver/apiserver.go newInstaller() prefix: /apis/authorization.k8s.io/v1beta1
//In /pkg/apiserver/apiserver.go newInstaller() prefix: /apis/autoscaling/v1
//In /pkg/apiserver/apiserver.go newInstaller() prefix: /apis/batch/v1
//In /pkg/apiserver/apiserver.go newInstaller() prefix: /apis/batch/v2alpha1
//In /pkg/apiserver/apiserver.go newInstaller() prefix: /apis/certificates.k8s.io/v1alpha1
//In /pkg/apiserver/apiserver.go newInstaller() prefix: /apis/extensions/v1beta1
//In /pkg/apiserver/apiserver.go newInstaller() prefix: /apis/policy/v1beta1
//In /pkg/apiserver/apiserver.go newInstaller() prefix: /apis/rbac.authorization.k8s.io/v1alpha1
//In /pkg/apiserver/apiserver.go newInstaller() prefix: /apis/storage.k8s.io/v1beta1
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
//***构建APIInstaller***//
installer := &APIInstaller{
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
}
return installer
}

关于installer的分析,将在下次进行。

Container的启动

Container即GenericAPIServer中的Handler。GenericAPIServer的Run()方法中会启动监听。