什么是Event机制

Event机制是Kubernetes用来记录系统发生的事件的一种机制,可以把Event发送给相关函数进行处理,其本质是一个单生产者,生产出的Event供多个消费者消费的模型。

Event生产者

Event生产者就是可以产生Event的对象。在Kubernetes中,Event生产者称为EventRecorder,EventRecorder为interface,具体由recorderImpl struct实现,定义在/pkg/client/record/event.go中:

1
2
3
4
5
6
7
8
9
// EventRecorder knows how to record events on behalf of an EventSource.
type EventRecorder interface {
// The resulting event will be created in the same namespace as the reference object.
Event(object runtime.Object, eventtype, reason, message string)
// Eventf is just like Event, but with Sprintf for the message field.
Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})
// PastEventf is just like Eventf, but with an option to specify the event's 'timestamp' field.
PastEventf(object runtime.Object, timestamp unversioned.Time, eventtype, reason, messageFmt string, args ...interface{})
}

recorderImpl的定义如下:

1
2
3
4
5
type recorderImpl struct {
source api.EventSource
*watch.Broadcaster
clock clock.Clock
}

现在需要记着的是recorderImpl中嵌入了watch.Broadcaster结构体。
再来看recorderImpl对EventRecorder接口中定义的函数的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//***recorder入口1***//
func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
recorder.generateEvent(object, unversioned.Now(), eventtype, reason, message)
}
//***recorder入口2***//
func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}
//***recorder入口3***//
func (recorder *recorderImpl) PastEventf(object runtime.Object, timestamp unversioned.Time, eventtype, reason, messageFmt string, args ...interface{}) {
recorder.generateEvent(object, timestamp, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}

Event()是对刚发生的消息进行记录;
Eventf()可以使用fmt.Sprintf()来格式化消息;
PastEventf()允许自定义消息发生的时间,来用记录已经发生过的消息。

这三个函数最后都调用了generateEvent()函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//***产生event,并把event广播***//
func (recorder *recorderImpl) generateEvent(object runtime.Object, timestamp unversioned.Time, eventtype, reason, message string) {
ref, err := api.GetReference(object)
if err != nil {
glog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
return
}
if !validateEventType(eventtype) {
glog.Errorf("Unsupported event type: '%v'", eventtype)
return
}
//***生成event***//
event := recorder.makeEvent(ref, eventtype, reason, message)
event.Source = recorder.source
go func() {
// NOTE: events should be a non-blocking operation
defer utilruntime.HandleCrash()
//***把event传播到所有watcher中***//
recorder.Action(watch.Added, event)
}()
}

generateEvent()先生成一个event,然后调用Action()对event进行处理。Action()是watch.Broadcaster结构体的成员函数,所以具体怎么处理,稍后章节分析。

所以,recorder(按代码中来称呼)可以生成event,并把event传递给Action()处理函数。

Event消费者

recorder产生的event,需要有程序来消费,这个消费程序就是EventBroadcaster,EventBroadcaster是一个interface,定义在/pkg/client/record/event.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
type EventBroadcaster interface {
// StartEventWatcher starts sending events received from this EventBroadcaster to the given
// event handler function. The return value can be ignored or used to stop recording, if
// desired.
StartEventWatcher(eventHandler func(*api.Event)) watch.Interface
// StartRecordingToSink starts sending events received from this EventBroadcaster to the given
// sink. The return value can be ignored or used to stop recording, if desired.
StartRecordingToSink(sink EventSink) watch.Interface
// StartLogging starts sending events received from this EventBroadcaster to the given logging
// function. The return value can be ignored or used to stop recording, if desired.
StartLogging(logf func(format string, args ...interface{})) watch.Interface
// NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster
// with the event source set to the given event source.
NewRecorder(source api.EventSource) EventRecorder
}

EventBroadcaster具体由eventBroadcasterImpl结构体实现:

1
2
3
4
type eventBroadcasterImpl struct {
*watch.Broadcaster
sleepDuration time.Duration
}

和recorderImpl一样,eventBroadcasterImpl(成员函数中称为eventBroadcaster)也嵌入了一个watch.Broadcaster结构体。

先来看eventBroadcaster的成员函数StartRecordingToSink()和StartLogging():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//***eventBroadcaster入口1***//
func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
// The default math/rand package functions aren't thread safe, so create a
// new Rand object for each StartRecording call.
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
eventCorrelator := NewEventCorrelator(clock.RealClock{})
return eventBroadcaster.StartEventWatcher(
func(event *api.Event) {
recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration)
})
}
//***eventBroadcaster入口2***//
func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
return eventBroadcaster.StartEventWatcher(
func(e *api.Event) {
logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
})
}

这两个函数自定义了event处理函数(一个记录到数据库中,一个记录到日志中),然后把处理函数作为参数传递给StartEventWatcher():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value can be ignored or used to stop recording, if desired.
func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*api.Event)) watch.Interface {
//***调用Watch(),生成一个新的watcher***//
watcher := eventBroadcaster.Watch()
go func() {
defer utilruntime.HandleCrash()
for {
//***监听watcher.ResultChan(),并对event使用eventHandler进行处理***//
watchEvent, open := <-watcher.ResultChan()
if !open {
return
}
//***可以使用watchEvent.Object.(*api.Event)的方法把一个interface转换成具体的类型***//
event, ok := watchEvent.Object.(*api.Event)
if !ok {
// This is all local, so there's no reason this should
// ever happen.
continue
}
//***处理事件***//
eventHandler(event)
}
}()
return watcher
}

StartEventWatcher()先生成一个event watcher,然后起routine监听watcher并处理event。

所以,eventBroadcaster可以生成一个event watcher,然后不断消费watcher中的event,并调用自定义函数对event进行处理。

现在,问题来了,作为event生产者,recorder调用的是Action()来处理event;作为event的消费者,eventBroadcaster通过event watcher获取event,所以必定有地方关联Action()及event watcher。因为recorder和eventBroadcaster都具有嵌入结构体watch.Broadcaster,所以可以猜测watch.Broadcaster是recorder和eventBroadcaster之间的桥梁。

Broadcaster

Broadcaster是一个通用模块,作用是把obj发送给多个watcher。所以可以把Broadcaster理解成分发者。Broadcaster定义在/pkg/watch/mux.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Broadcaster distributes event notifications among any number of watchers. Every event
// is delivered to every watcher.
type Broadcaster struct {
// TODO: see if this lock is needed now that new watchers go through
// the incoming channel.
lock sync.Mutex
watchers map[int64]*broadcasterWatcher
nextWatcher int64
distributing sync.WaitGroup
incoming chan Event
// How large to make watcher's channel.
watchQueueLength int
// If one of the watch channels is full, don't wait for it to become empty.
// Instead just deliver it to the watchers that do have space in their
// channels and move on to the next event.
// It's more fair to do this on a per-watcher basis than to do it on the
// "incoming" channel, which would allow one slow watcher to prevent all
// other watchers from getting new events.
fullChannelBehavior FullChannelBehavior
}

在Broadcaster中,包含一个incoming channel和一个broadcasterWatcher map(之后提到的watcher就是broadcasterWatcher)。obj会缓存在incoming channel中,然后再分发给各个watcher。

先来看Broadcaster的创建函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue per watcher.
// It is guaranteed that events will be distributed in the order in which they occur,
// but the order in which a single event is distributed among all of the watchers is unspecified.
func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
m := &Broadcaster{
watchers: map[int64]*broadcasterWatcher{},
incoming: make(chan Event, incomingQueueLength),
watchQueueLength: queueLength,
fullChannelBehavior: fullChannelBehavior,
}
m.distributing.Add(1)
go m.loop()
return m
}

可以看出,NewBroadcaster()函数在生成Broadcaster的同时,还拉起了loop()函数。

loop()函数会消费incoming channel中的obj,然后把obj进行分发。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// loop receives from m.incoming and distributes to all watchers.
func (m *Broadcaster) loop() {
// Deliberately not catching crashes here. Yes, bring down the process if there's a
// bug in watch.Broadcaster.
//***遍历incoming中的内容***//
for {
event, ok := <-m.incoming
if !ok {
break
}
if event.Type == internalRunFunctionMarker {
event.Object.(functionFakeRuntimeObject)()
continue
}
//***把event分发到所有的watcher中***//
m.distribute(event)
}
m.closeAll()
m.distributing.Done()
}

再来看distribute()函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// distribute sends event to all watchers. Blocking.
//***把event分发***//
func (m *Broadcaster) distribute(event Event) {
m.lock.Lock()
defer m.lock.Unlock()
if m.fullChannelBehavior == DropIfChannelFull {
for _, w := range m.watchers {
select {
case w.result <- event:
case <-w.stopped:
default: // Don't block if the event can't be queued.
}
}
} else {
for _, w := range m.watchers {
select {
case w.result <- event:
case <-w.stopped:
}
}
}
}

distribute就是把event放到Broadcaster中注册的各watcher中。关于watcher,内容生产者把内容写到watcher的result channel中,消费者通过watcher.Channel()来获取result channel,并读出内容。

那么,event是什么时候放到incoming channel中呢?答案就是之前提过的Action():

1
2
3
4
5
// Action distributes the given event among all watchers.
//***把event放入到incoming channel中***//
func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
m.incoming <- Event{action, obj}
}

现在我们来看来Broadcaster是如何管理watcher的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Watch adds a new watcher to the list and returns an Interface for it.
// Note: new watchers will only receive new events. They won't get an entire history
// of previous events.
//***生成一个新的watcher***//
func (m *Broadcaster) Watch() Interface {
var w *broadcasterWatcher
m.blockQueue(func() {
m.lock.Lock()
defer m.lock.Unlock()
id := m.nextWatcher
m.nextWatcher++
w = &broadcasterWatcher{
result: make(chan Event, m.watchQueueLength),
stopped: make(chan struct{}),
id: id,
m: m,
}
m.watchers[id] = w
})
return w
}

Watch()函数会生成一个新的broadcasterWatcher,然后把watcher加入到Broadcaster的watchers map中。其中,broadcasterWatcher中的id和watcher map中的key id值一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Stop stops watching and removes mw from its list.
func (mw *broadcasterWatcher) Stop() {
mw.stop.Do(func() {
close(mw.stopped)
mw.m.stopWatching(mw.id)
})
}
func (m *Broadcaster) stopWatching(id int64) {
m.lock.Lock()
defer m.lock.Unlock()
w, ok := m.watchers[id]
if !ok {
// No need to do anything, it's already been removed from the list.
return
}
delete(m.watchers, id)
close(w.result)
}

调用broadcasterWatcher的Stop()可以把broadcasterWatcher从Broadcaster中移除并停止broadcasterWatcher。

给Broadcaster做个总结,Action()函数把obj放到incoming channel中,然后有一个循环loop()消费incoming channel中的obj,然后把obj分发到多个broadcasterWatcher中。

使用

下面Demo是依据/pkg/client/record/event_test.go写的,可以说明如何使用Kubernetes的Event机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package main
import (
"fmt"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/record"
)
type testEventSink struct {
}
// CreateEvent records the event for testing.
func (t *testEventSink) Create(e *api.Event) (*api.Event, error) {
fmt.Println("Create")
return e, nil
}
// UpdateEvent records the event for testing.
func (t *testEventSink) Update(e *api.Event) (*api.Event, error) {
fmt.Println("Update")
return e, nil
}
// PatchEvent records the event for testing.
func (t *testEventSink) Patch(e *api.Event, p []byte) (*api.Event, error) {
fmt.Println("Patch")
return e, nil
}
func main() {
// ref := &api.ObjectReference{
// Kind: "Pod",
// Name: "foo",
// Namespace: "baz",
// UID: "bar",
// APIVersion: "version",
// }
testPod := &api.Pod{
TypeMeta: unversioned.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: "baz",
},
}
eventBroadcaster := record.NewBroadcaster()
//直接输出
eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
fmt.Printf(formatter, args...)
})
//交给testEventSink处理
eventBroadcaster.StartRecordingToSink(&testEventSink{})
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "testComponent"})
//记录事件
recorder.Event(testPod, api.EventTypeNormal, "Get", "Get a pod")
select {}
}

输出:

1
2
Create
Event(api.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"", APIVersion:"v1", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Get' Get a pod