什么是Event机制 Event机制是Kubernetes用来记录系统发生的事件的一种机制,可以把Event发送给相关函数进行处理,其本质是一个单生产者,生产出的Event供多个消费者消费的模型。
Event生产者 Event生产者就是可以产生Event的对象。在Kubernetes中,Event生产者称为EventRecorder,EventRecorder为interface,具体由recorderImpl struct实现,定义在/pkg/client/record/event.go中:1
2
3
4
5
6
7
8
9
type EventRecorder interface {
Event(object runtime.Object, eventtype, reason, message string )
Eventf(object runtime.Object, eventtype, reason, messageFmt string , args ...interface {})
PastEventf(object runtime.Object, timestamp unversioned.Time, eventtype, reason, messageFmt string , args ...interface {})
}
recorderImpl的定义如下:1
2
3
4
5
type recorderImpl struct {
source api.EventSource
*watch.Broadcaster
clock clock.Clock
}
现在需要记着的是recorderImpl中嵌入了watch.Broadcaster结构体。 再来看recorderImpl对EventRecorder接口中定义的函数的实现:1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (recorder *recorderImpl) Event (object runtime.Object, eventtype, reason, message string ) {
recorder.generateEvent(object, unversioned.Now(), eventtype, reason, message)
}
func (recorder *recorderImpl) Eventf (object runtime.Object, eventtype, reason, messageFmt string , args ...interface {}) {
recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}
func (recorder *recorderImpl) PastEventf (object runtime.Object, timestamp unversioned.Time, eventtype, reason, messageFmt string , args ...interface {}) {
recorder.generateEvent(object, timestamp, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}
Event()是对刚发生的消息进行记录; Eventf()可以使用fmt.Sprintf()来格式化消息; PastEventf()允许自定义消息发生的时间,来用记录已经发生过的消息。
这三个函数最后都调用了generateEvent()函数:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (recorder *recorderImpl) generateEvent (object runtime.Object, timestamp unversioned.Time, eventtype, reason, message string ) {
ref, err := api.GetReference(object)
if err != nil {
glog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'" , object, err, eventtype, reason, message)
return
}
if !validateEventType(eventtype) {
glog.Errorf("Unsupported event type: '%v'" , eventtype)
return
}
event := recorder.makeEvent(ref, eventtype, reason, message)
event.Source = recorder.source
go func () {
defer utilruntime.HandleCrash()
recorder.Action(watch.Added, event)
}()
}
generateEvent()先生成一个event,然后调用Action()对event进行处理。Action()是watch.Broadcaster结构体的成员函数,所以具体怎么处理,稍后章节分析。
所以,recorder(按代码中来称呼)可以生成event,并把event传递给Action()处理函数。
Event消费者 recorder产生的event,需要有程序来消费,这个消费程序就是EventBroadcaster,EventBroadcaster是一个interface,定义在/pkg/client/record/event.go中:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type EventBroadcaster interface {
StartEventWatcher(eventHandler func (*api.Event) ) watch .Interface
// StartRecordingToSink starts sending events received from this EventBroadcaster to the given
// sink . The return value can be ignored or used to stop recording , if desired .
StartRecordingToSink (sink EventSink) watch .Interface
// StartLogging starts sending events received from this EventBroadcaster to the given logging
// function . The return value can be ignored or used to stop recording , if desired .
StartLogging (logf func (format string , args ...interface {}) ) watch .Interface
// NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster
// with the event source set to the given event source .
NewRecorder (source api.EventSource) EventRecorder
}
EventBroadcaster具体由eventBroadcasterImpl结构体实现:1
2
3
4
type eventBroadcasterImpl struct {
*watch.Broadcaster
sleepDuration time.Duration
}
和recorderImpl一样,eventBroadcasterImpl(成员函数中称为eventBroadcaster)也嵌入了一个watch.Broadcaster结构体。
先来看eventBroadcaster的成员函数StartRecordingToSink()和StartLogging():1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink (sink EventSink) watch .Interface {
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
eventCorrelator := NewEventCorrelator(clock.RealClock{})
return eventBroadcaster.StartEventWatcher(
func (event *api.Event) {
recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration)
})
}
func (eventBroadcaster *eventBroadcasterImpl) StartLogging (logf func (format string , args ...interface {}) ) watch .Interface {
return eventBroadcaster.StartEventWatcher(
func (e *api.Event) {
logf("Event(%#v): type: '%v' reason: '%v' %v" , e.InvolvedObject, e.Type, e.Reason, e.Message)
})
}
这两个函数自定义了event处理函数(一个记录到数据库中,一个记录到日志中),然后把处理函数作为参数传递给StartEventWatcher():1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher (eventHandler func (*api.Event) ) watch .Interface {
watcher := eventBroadcaster.Watch()
go func () {
defer utilruntime.HandleCrash()
for {
watchEvent, open := <-watcher.ResultChan()
if !open {
return
}
event, ok := watchEvent.Object.(*api.Event)
if !ok {
continue
}
eventHandler(event)
}
}()
return watcher
}
StartEventWatcher()先生成一个event watcher,然后起routine监听watcher并处理event。
所以,eventBroadcaster可以生成一个event watcher,然后不断消费watcher中的event,并调用自定义函数对event进行处理。
现在,问题来了,作为event生产者,recorder调用的是Action()来处理event;作为event的消费者,eventBroadcaster通过event watcher获取event,所以必定有地方关联Action()及event watcher。因为recorder和eventBroadcaster都具有嵌入结构体watch.Broadcaster,所以可以猜测watch.Broadcaster是recorder和eventBroadcaster之间的桥梁。
Broadcaster Broadcaster是一个通用模块,作用是把obj发送给多个watcher。所以可以把Broadcaster理解成分发者。Broadcaster定义在/pkg/watch/mux.go中:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type Broadcaster struct {
lock sync.Mutex
watchers map [int64 ]*broadcasterWatcher
nextWatcher int64
distributing sync.WaitGroup
incoming chan Event
watchQueueLength int
fullChannelBehavior FullChannelBehavior
}
在Broadcaster中,包含一个incoming channel和一个broadcasterWatcher map(之后提到的watcher就是broadcasterWatcher)。obj会缓存在incoming channel中,然后再分发给各个watcher。
先来看Broadcaster的创建函数:1
2
3
4
5
6
7
8
9
10
11
12
13
14
func NewBroadcaster (queueLength int , fullChannelBehavior FullChannelBehavior) *Broadcaster {
m := &Broadcaster{
watchers: map [int64 ]*broadcasterWatcher{},
incoming: make (chan Event, incomingQueueLength),
watchQueueLength: queueLength,
fullChannelBehavior: fullChannelBehavior,
}
m.distributing.Add(1 )
go m.loop()
return m
}
可以看出,NewBroadcaster()函数在生成Broadcaster的同时,还拉起了loop()函数。
loop()函数会消费incoming channel中的obj,然后把obj进行分发。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (m *Broadcaster) loop () {
for {
event, ok := <-m.incoming
if !ok {
break
}
if event.Type == internalRunFunctionMarker {
event.Object.(functionFakeRuntimeObject)()
continue
}
m.distribute(event)
}
m.closeAll()
m.distributing.Done()
}
再来看distribute()函数:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (m *Broadcaster) distribute (event Event) {
m.lock.Lock()
defer m.lock.Unlock()
if m.fullChannelBehavior == DropIfChannelFull {
for _, w := range m.watchers {
select {
case w.result <- event:
case <-w.stopped:
default :
}
}
} else {
for _, w := range m.watchers {
select {
case w.result <- event:
case <-w.stopped:
}
}
}
}
distribute就是把event放到Broadcaster中注册的各watcher中。关于watcher,内容生产者把内容写到watcher的result channel中,消费者通过watcher.Channel()来获取result channel,并读出内容。
那么,event是什么时候放到incoming channel中呢?答案就是之前提过的Action():1
2
3
4
5
func (m *Broadcaster) Action (action EventType, obj runtime.Object) {
m.incoming <- Event{action, obj}
}
现在我们来看来Broadcaster是如何管理watcher的。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (m *Broadcaster) Watch () Interface {
var w *broadcasterWatcher
m.blockQueue(func () {
m.lock.Lock()
defer m.lock.Unlock()
id := m.nextWatcher
m.nextWatcher++
w = &broadcasterWatcher{
result: make (chan Event, m.watchQueueLength),
stopped: make (chan struct {}),
id: id,
m: m,
}
m.watchers[id] = w
})
return w
}
Watch()函数会生成一个新的broadcasterWatcher,然后把watcher加入到Broadcaster的watchers map中。其中,broadcasterWatcher中的id和watcher map中的key id值一样。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (mw *broadcasterWatcher) Stop () {
mw.stop.Do(func () {
close (mw.stopped)
mw.m.stopWatching(mw.id)
})
}
func (m *Broadcaster) stopWatching (id int64 ) {
m.lock.Lock()
defer m.lock.Unlock()
w, ok := m.watchers[id]
if !ok {
return
}
delete (m.watchers, id)
close (w.result)
}
调用broadcasterWatcher的Stop()可以把broadcasterWatcher从Broadcaster中移除并停止broadcasterWatcher。
给Broadcaster做个总结,Action()函数把obj放到incoming channel中,然后有一个循环loop()消费incoming channel中的obj,然后把obj分发到多个broadcasterWatcher中。
使用 下面Demo是依据/pkg/client/record/event_test.go写的,可以说明如何使用Kubernetes的Event机制:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package main
import (
"fmt"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/record"
)
type testEventSink struct {
}
func (t *testEventSink) Create (e *api.Event) (*api.Event, error) {
fmt.Println("Create" )
return e, nil
}
func (t *testEventSink) Update (e *api.Event) (*api.Event, error) {
fmt.Println("Update" )
return e, nil
}
func (t *testEventSink) Patch (e *api.Event, p []byte ) (*api.Event, error) {
fmt.Println("Patch" )
return e, nil
}
func main () {
testPod := &api.Pod{
TypeMeta: unversioned.TypeMeta{
Kind: "Pod" ,
APIVersion: "v1" ,
},
ObjectMeta: api.ObjectMeta{
Name: "foo" ,
Namespace: "baz" ,
},
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(func (formatter string , args ...interface {}) {
fmt.Printf(formatter, args...)
})
eventBroadcaster.StartRecordingToSink(&testEventSink{})
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "testComponent" })
recorder.Event(testPod, api.EventTypeNormal, "Get" , "Get a pod" )
select {}
}
输出:1
2
Create
Event(api.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"", APIVersion:"v1", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Get' Get a pod