分析 kubernetes 中的事件机制 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
silenceper
V2EX    Kubernetes

分析 kubernetes 中的事件机制

  •  
  •   silenceper 2020-03-06 21:54:44 +08:00 4334 次点击
    这是一个创建于 2127 天前的主题,其中的信息可能已经有所发展或是发生改变。

    我们通过 kubectl describe [资源] 命令,可以在看到 Event 输出,并且经常依赖 event 进行问题定位,从 event 中可以分析整个 POD 的运行轨迹,为服务的客观测性提供数据来源,由此可见,event 在 Kubernetes 中起着举足轻重的作用。

    event 展示

    event 并不只是 kubelet 中都有的,关于 event 的操作被封装在client-go/tools/record包,我们完全可以在写入自定义的 event。

    现在让我们来一步步揭开 event 的面纱。

    Event 定义

    其实 event 也是一个资源对象,并且通过 apiserver 将 event 存储在 etcd 中,所以我们也可以通过 kubectl get event 命令查看对应的 event 对象。

    以下是一个 event 的 yaml 文件:

    apiVersion: v1 count: 1 eventTime: null firstTimestamp: "2020-03-02T13:08:22Z" involvedObject: apiVersion: v1 kind: Pod name: example-foo-d75d8587c-xsf64 namespace: default resourceVersion: "429837" uid: ce611c62-6c1a-4bd8-9029-136a1adf7de4 kind: Event lastTimestamp: "2020-03-02T13:08:22Z" message: Pod sandbox changed, it will be killed and re-created. metadata: creationTimestamp: "2020-03-02T13:08:30Z" name: example-foo-d75d8587c-xsf64.15f87ea1df862b64 namespace: default resourceVersion: "479466" selfLink: /api/v1/namespaces/default/events/example-foo-d75d8587c-xsf64.15f87ea1df862b64 uid: 9fe6f72a-341d-4c49-960b-e185982d331a reason: SandboxChanged reportingComponent: "" reportingInstance: "" source: component: kubelet host: minikube type: Normal 

    **
    主要字段说明:

    • involvedObject: 触发 event 的资源类型
    • lastTimestamp:最后一次触发的时间
    • message:事件说明
    • metadata :event 的元信息,name,namespace 等
    • reason:event 的原因
    • source:上报事件的来源,比如 kubelet 中的某个节点
    • type:事件类型,Normal 或 Warning

    event 字段定义可以看这里:types.go#L5078

    接下来我们来看看,整个 event 是如何下入的。

    写入事件

    1、这里以 kubelet 为例,看看是如何进行事件写入的

    2、文中代码以 Kubernetes 1.17.3 为例进行分析

    先以一幅图来看下整个的处理流程 event 处理过程

    创建操作事件的客户端:
    kubelet/app/server.go#L461

    // makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise. func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) { if kubeDeps.Recorder != nil { return } //事件广播 eventBroadcaster := record.NewBroadcaster() //创建 EventRecorder kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)}) //发送 event 至 log 输出 eventBroadcaster.StartLogging(klog.V(3).Infof) if kubeDeps.EventClient != nil { klog.V(4).Infof("Sending events to api server.") //发送 event 至 apiserver eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")}) } else { klog.Warning("No api server defined - no events will be sent to API server.") } } 

    通过 makeEventRecorder 创建了 EventRecorder 实例,这是一个事件广播器,通过它提供了 StartLogging 和 StartRecordingToSink 两个事件处理函数,分别将 event 发送给 log 和 apiserver。
    NewRecorder创建了 EventRecorder 的实例,它提供了 EventEventf 等方法供事件记录。

    EventBroadcaster

    我们来看下 EventBroadcaster 接口定义:event.go#L113

    // EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log. type EventBroadcaster interface { // StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface StartRecordingToSink(sink EventSink) watch.Interface StartLogging(logf func(format string, args ...interface{})) watch.Interface NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder Shutdown() } 

    具体实现是通过 eventBroadcasterImpl struct 来实现了各个方法。

    其中 StartLogging 和 StartRecordingToSink 其实就是完成了对事件的消费,EventRecorder 实现对事件的写入,中间通过 channel 实现了生产者消费者模型。

    EventRecorder

    我们先来看下EventRecorder 接口定义:event.go#L88,提供了一下 4 个方法

    // EventRecorder knows how to record events on behalf of an EventSource. type EventRecorder interface { // Event constructs an event from the given information and puts it in the queue for sending. // 'object' is the object this event is about. Event will make a reference-- or you may also // pass a reference to the object directly. // 'type' of this event, and can be one of Normal, Warning. New types could be added in future // 'reason' is the reason this event is generated. 'reason' should be short and unique; it // should be in UpperCamelCase format (starting with a capital letter). "reason" will be used // to automate handling of events, so imagine people writing switch statements to handle them. // You want to make that easy. // 'message' is intended to be human readable. // // The resulting event will be created in the same namespace as the reference object. Event(object runtime.Object, eventtype, reason, message string) // Eventf is just like Event, but with Sprintf for the message field. Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) // PastEventf is just like Eventf, but with an option to specify the event's 'timestamp' field. PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) // AnnotatedEventf is just like eventf, but with annotations attached AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) } 

    主要参数说明:

    • object 对应 event 资源定义中的 involvedObject
    • eventtype 对应 event 资源定义中的 type,可选 Normal,Warning.
    • reason :事件原因
    • message :事件消息

    我们来看下当我们调用 Event(object runtime.Object, eventtype, reason, message string) 的整个过程。
    发现最终都调用到了 generateEvent 方法:event.go#L316

    func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) { ..... event := recorder.makeEvent(ref, annotations, eventtype, reason, message) event.Source = recorder.source go func() { // NOTE: events should be a non-blocking operation defer utilruntime.HandleCrash() recorder.Action(watch.Added, event) }() } 

    最终事件在一个 goroutine 中通过调用 recorder.Action 进入处理,这里保证了每次调用 event 方法都是非阻塞的。
    其中 makeEvent 的作用主要是构造了一个 event 对象,事件 name 根据 InvolvedObject 中的 name 加上时间戳生成:

    注意看:对于一些非 namespace 资源产生的 event,event 的 namespace 是 default

    func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event { t := metav1.Time{Time: recorder.clock.Now()} namespace := ref.Namespace if namespace == "" { namespace = metav1.NamespaceDefault } return &v1.Event{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()), Namespace: namespace, Annotations: annotations, }, InvolvedObject: *ref, Reason: reason, Message: message, FirstTimestamp: t, LastTimestamp: t, Count: 1, Type: eventtype, } } 

    进一步跟踪Action方法,apimachinery/blob/master/pkg/watch/mux.go#L188:23

    // Action distributes the given event among all watchers. func (m *Broadcaster) Action(action EventType, obj runtime.Object) { m.incoming <- Event{action, obj} } 

    将 event 写入到了一个 channel 里面。
    注意:
    这个 Action 方式是apimachinery包中的方法,因为实现的 sturt recorderImpl
    *watch.Broadcaster 作为一个匿名 struct,并且在 NewRecorder 进行 Broadcaster 赋值,这个Broadcaster其实就是 eventBroadcasterImpl 中的Broadcaster

    到此,基本清楚了 event 最终被写入到了 Broadcaster 中的 incoming channel 中,下面看下是怎么进行消费的。

    消费事件

    makeEventRecorder 调用的 StartLoggingStartRecordingToSink 其实就是完成了对事件的消费。

    • StartLogging直接将 event 输出到日志
    • StartRecordingToSink将事件写入到 apiserver

    两个方法内部都调用了 StartEventWatcher 方法,并且传入一个 eventHandler 方法对 event 进行处理

    func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface { watcher := e.Watch() go func() { defer utilruntime.HandleCrash() for watchEvent := range watcher.ResultChan() { event, ok := watchEvent.Object.(*v1.Event) if !ok { // This is all local, so there's no reason this should // ever happen. continue } eventHandler(event) } }() return watcher } 

    其中 watcher.ResultChan 方法就拿到了事件,这里是在一个 goroutine 中通过func (m *Broadcaster) loop() ==>func (m *Broadcaster) distribute(event Event) 方法调用将 event 又写入了broadcasterWatcher.result

    主要看下 StartRecordingToSink 提供的的eventHandlerrecordToSink 方法:

    func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, sleepDuration time.Duration) { // Make a copy before modification, because there could be multiple listeners. // Events are safe to copy like this. eventCopy := *event event = &eventCopy result, err := eventCorrelator.EventCorrelate(event) if err != nil { utilruntime.HandleError(err) } if result.Skip { return } tries := 0 for { if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) { break } tries++ if tries >= maxTriesPerEvent { klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event) break } // Randomize the first sleep so that various clients won't all be // synced up if the master goes down. // 第一次重试增加随机性,防止 apiserver 重启的时候所有的事件都在同一时间发送事件 if tries == 1 { time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64())) } else { time.Sleep(sleepDuration) } } } 

    其中 event 被经过了一个 eventCorrelator.EventCorrelate(event) 方法做预处理,主要是聚合相同的事件(避免产生的事件过多,增加 etcd 和 apiserver 的压力,也会导致查看 pod 事件很不清晰)

    下面一个 for 循环就是在进行重试,最大重试次数是 12 次,调用 recordEvent 方法才真正将 event 写入到了 apiserver。

    事件处理

    我们来看下EventCorrelate方法:

    // EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) { if newEvent == nil { return nil, fmt.Errorf("event is nil") } aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent) observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey) if c.filterFunc(observedEvent) { return &EventCorrelateResult{Skip: true}, nil } return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err } 

    分别调用了 aggregator.EventAggregatelogger.eventObservefilterFunc 三个方法,分别作用是:

    1. aggregator.EventAggregate:聚合 event,如果在最近 10 分钟出现过 10 个相似的事件(除了 message 和时间戳之外其他关键字段都相同的事件),aggregator 会把它们的 message 设置为 (combined from similar events)+event.Message
    2. logger.eventObserve:它会把相同的事件以及包含 aggregator 被聚合了的相似的事件,通过增加 Count 字段来记录事件发生了多少次。
    3. filterFunc: 这里实现了一个基于令牌桶的限流算法,如果超过设定的速率则丢弃,保证了 apiserver 的安全。

    我们主要来看下aggregator.EventAggregate方法:

    func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) { now := metav1.NewTime(e.clock.Now()) var record aggregateRecord // eventKey is the full cache key for this event //eventKey 是将除了时间戳外所有字段结合在一起 eventKey := getEventKey(newEvent) // aggregateKey is for the aggregate event, if one is needed. //aggregateKey 是除了 message 和时间戳外的字段结合在一起,localKey 是 message aggregateKey, localKey := e.keyFunc(newEvent) // Do we have a record of similar events in our cache? e.Lock() defer e.Unlock() //从 cache 中根据 aggregateKey 查询是否存在,如果是相同或者相类似的事件会被放入 cache 中 value, found := e.cache.Get(aggregateKey) if found { record = value.(aggregateRecord) } //判断上次事件产生的时间是否超过 10 分钟,如何操作则重新生成一个 localKeys 集合(集合中存放 message ) maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second interval := now.Time.Sub(record.lastTimestamp.Time) if interval > maxInterval { record = aggregateRecord{localKeys: sets.NewString()} } // Write the new event into the aggregation record and put it on the cache //将 locakKey 也就是 message 放入集合中,如果 message 相同就是覆盖了 record.localKeys.Insert(localKey) record.lastTimestamp = now e.cache.Add(aggregateKey, record) // If we are not yet over the threshold for unique events, don't correlate them //判断 localKeys 集合中存放的类似事件是否超过 10 个, if uint(record.localKeys.Len()) < e.maxEvents { return newEvent, eventKey } // do not grow our local key set any larger than max record.localKeys.PopAny() // create a new aggregate event, and return the aggregateKey as the cache key // (so that it can be overwritten.) eventCopy := &v1.Event{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()), Namespace: newEvent.Namespace, }, Count: 1, FirstTimestamp: now, InvolvedObject: newEvent.InvolvedObject, LastTimestamp: now, //这里会对 message 加个前缀:(combined from similar events): Message: e.messageFunc(newEvent), Type: newEvent.Type, Reason: newEvent.Reason, Source: newEvent.Source, } return eventCopy, aggregateKey } 

    aggregator.EventAggregate方法中其实就是判断了通过 cache 和 localKeys 判断事件是否相似,如果最近 10 分钟出现过 10 个相似的事件就合并并加上前缀,后续通过logger.eventObserve方法进行 count 累加,如果 message 也相同,肯定就是直接 count++。

    总结

    好了,event 处理的整个流程基本就是这样,我们可以概括一下,可以结合文中的图对比一起看下:

    1. 创建 EventRecorder 对象,通过其提供的 Event 等方法,创建好 event 对象
    2. 将创建出来的对象发送给 EventBroadcaster 中的 channel 中
    3. EventBroadcaster 通过后台运行的 goroutine,从管道中取出事件,并广播给提前注册好的 handler 处理
    4. 当输出 log 的 handler 收到事件就直接打印事件
    5. EventSink handler 收到处理事件就通过预处理之后将事件发送给 apiserver
    6. 其中预处理包含三个动作,1、限流 2、聚合 3、计数
    7. apiserver 收到事件处理之后就存储在 etcd 中

    回顾 event 的整个流程,可以看到 event 并不是保证 100%事件写入(从预处理的过程来看),这样做是为了后端服务 etcd 的可用性,因为 event 事件在整个集群中产生是非常频繁的,尤其在服务不稳定的时候,而相比 Deployment,Pod 等其他资源,又没那么的重要。所以这里做了个取舍。

    参考文档:

    原文地址:https://silenceper.com/blog/202003/kubernetes-event/

    2 条回复    2020-03-18 22:13:52 +08:00
    better0332
        1
    better0332  
       2020-03-08 22:36:54 +08:00
    注意 k8s event 只保留 1 小时
    silenceper
        2
    silenceper  
    OP
       2020-03-18 22:13:52 +08:00
    @better0332 是的
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     2288 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 27ms UTC 15:48 PVG 23:48 LAX 07:48 JFK 10:48
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86