上一次分析介绍了如何添加资源Application,现在就要在kube-controller-manager中添加一个controller对Appliaction进行一些操作。

添加application controller

在/pkg/controller目录下新建application目录,然后在application目录下添加application_controller.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package application contains all the logic for handling Kubernetes Application.
package application
import (
"time"
"strings"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
)
// ApplicationController is responsible for synchronizing Application objects.
type ApplicationController struct {
client clientset.Interface
eventRecorder record.EventRecorder
syncHandler func(dKey string) error
// Applications that need to be synced
queue workqueue.RateLimitingInterface
}
// NewApplicationController creates a new ApplicationController.
func NewApplicationController(aInformer informers.ApplicationInformer, client clientset.Interface) *ApplicationController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: client.Core().Events("")})
if client != nil && client.Core().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("application_controller", client.Core().RESTClient().GetRateLimiter())
}
ac := &ApplicationController{
client: client,
eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "application-controller"}),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "application"),
}
aInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ac.addApplicationNotification,
UpdateFunc: ac.updateApplicationNotification,
// This will enter the sync loop and no-op, because the application has been deleted from the store.
DeleteFunc: ac.deleteApplicationNotification,
})
ac.syncHandler = ac.syncApplication
return ac
}
// Run begins watching and syncing.
func (ac *ApplicationController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
glog.Infof("Starting application controller")
go wait.Until(ac.worker, time.Second, stopCh)
<-stopCh
glog.Infof("Shutting down application controller")
}
func (ac *ApplicationController) worker() {
work := func() bool {
key, quit := ac.queue.Get()
if quit {
return true
}
defer ac.queue.Done(key)
ac.syncHandler(key.(string))
return false
}
for {
if quit := work(); quit {
return
}
}
}
func (ac *ApplicationController) enqueueApplication(application *extensions.Application) {
key, err := controller.KeyFunc(application)
if err != nil {
glog.Errorf("Couldn't get key for object %#v: %v", application, err)
return
}
ac.queue.Add(key)
}
func (ac *ApplicationController) addApplicationNotification(obj interface{}) {
d := obj.(*extensions.Application)
glog.V(4).Infof("Adding application %s", d.Name)
ac.enqueueApplication(d)
}
func (ac *ApplicationController) updateApplicationNotification(old, cur interface{}) {
oldD := old.(*extensions.Application)
glog.V(4).Infof("Updating application %s", oldD.Name)
// Resync on application object relist.
ac.enqueueApplication(cur.(*extensions.Application))
}
func (ac *ApplicationController) deleteApplicationNotification(obj interface{}) {
d, ok := obj.(*extensions.Application)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %#v", obj)
return
}
d, ok = tombstone.Obj.(*extensions.Application)
if !ok {
glog.Errorf("Tombstone contained object that is not a Application %#v", obj)
return
}
}
glog.V(4).Infof("Deleting application %s", d.Name)
ac.enqueueApplication(d)
}
func (ac *ApplicationController) syncApplication(key string) error {
segs := strings.Split(key, "/")
namespace := segs[0]
name := segs[1]
application, err := ac.client.Extensions().Applications(namespace).Get(name)
if err != nil {
return err
}
glog.V(0).Infof("This is from syncApplication: %s/%s", application.Namespace, application.Name)
return nil
}

该controller实现的功能很简单,不断在日志中记录("This is from syncApplication: %s/%s", application.Namespace, application.Name)

当然,application_controller.go中用到了informers.ApplicationInformer,但还没定义。

informers.ApplicationInformer

在/pkg/controller/informers/extensions.go中定义ApplicationInformer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// ApplicationInformer is a type of SharedIndexInformer which watches and lists all deployments.
type ApplicationInformer interface {
Informer() cache.SharedIndexInformer
Lister() *cache.StoreToApplicationLister
}
type applicationInformer struct {
*sharedInformerFactory
}
func (f *applicationInformer) Informer() cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerType := reflect.TypeOf(&extensions.Application{})
informer, exists := f.informers[informerType]
if exists {
return informer
}
informer = cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return f.client.Extensions().Applications(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return f.client.Extensions().Applications(api.NamespaceAll).Watch(options)
},
},
&extensions.Application{},
// TODO remove this. It is hardcoded so that "Waiting for the second deployment to clear overlapping annotation" in
// "overlapping deployment should not fight with each other" will work since it requires a full resync to work properly.
30*time.Second,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
f.informers[informerType] = informer
return informer
}
func (f *applicationInformer) Lister() *cache.StoreToApplicationLister {
informer := f.Informer()
return &cache.StoreToApplicationLister{Indexer: informer.GetIndexer()}
}

可以看到,informers.ApplicationInformer中用到了cache.StoreToApplicationLister,但还未定义。

StoreToApplicationLister

修改pkg/client/cache/listers_extensions.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
type StoreToApplicationLister struct {
Indexer Indexer
}
func (s *StoreToApplicationLister) List(selector labels.Selector) (ret []*extensions.Application, err error) {
err = ListAll(s.Indexer, selector, func(m interface{}) {
ret = append(ret, m.(*extensions.Application))
})
return ret, err
}
func (s *StoreToApplicationLister) Applications(namespace string) storeApplicationsNamespacer {
return storeApplicationsNamespacer{Indexer: s.Indexer, namespace: namespace}
}
type storeApplicationsNamespacer struct {
Indexer Indexer
namespace string
}
func (s storeApplicationsNamespacer) List(selector labels.Selector) (ret []*extensions.Application, err error) {
err = ListAllByNamespace(s.Indexer, s.namespace, selector, func(m interface{}) {
ret = append(ret, m.(*extensions.Application))
})
return ret, err
}
func (s storeApplicationsNamespacer) Get(name string) (*extensions.Application, error) {
obj, exists, err := s.Indexer.GetByKey(s.namespace + "/" + name)
if err != nil {
return nil, err
}
if !exists {
return nil, errors.NewNotFound(extensions.Resource("application"), name)
}
return obj.(*extensions.Application), nil
}

informer factory

现在我们有了ApplicationInformer,可以在informer factory中注册ApplicationInformer了,修改/pkg/controller/informers/factory.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type SharedInformerFactory interface {
// Start starts informers that can start AFTER the API server and controllers have started
Start(stopCh <-chan struct{})
ForResource(unversioned.GroupResource) (GenericInformer, error)
// when you update these, update generic.go/ForResource, same package
......
Applications() ApplicationInformer
......
)
func (f *sharedInformerFactory) Applications() ApplicationInformer {
return &applicationInformer{sharedInformerFactory: f}
}

controller cmd

修改/cmd/kube-controller-manager/app/controllermanager.go:

1
2
3
4
5
import (
......
"k8s.io/kubernetes/pkg/controller/application"
......
)

在StartControllers()中修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
if containsVersion(versions, groupVersion) && found {
glog.Infof("Starting %s apis", groupVersion)
......
if containsResource(resources, "applications") {
glog.Infof("Starting application controller")
go application.NewApplicationController(sharedInformers.Applications(), client("application-controller")).Run(int(s.ConcurrentDeploymentSyncs), wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
......
}

验证

编译,启动kube-controller-manager,有如下日志重复输出:

1
2
I1209 21:28:25.478233 19090 application_controller.go:157] This is from syncApplication: default/ubuntu
I1209 21:28:25.534602 19090 application_controller.go:157] This is from syncApplication: default/ubuntu1