热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

KubernetesEvent原理和源码分析

在Operator开发过程中难免会用到Event对象,所以很有必要了解Event相关细节,可以避

概述

源码版本信息

  • Project: kubernetes

  • Branch: master

  • Last commit id: d25d741c (2021-09-26)

在 Operator 开发过程中难免会用到 Event 对象,所以很有必要了解 Event 相关细节,可以避免很多 bug 的产生。client-go 在处理 Event 的时候,有这样一些特性:

  1. 如果 apiserver 失联,会重试发送 12 次,第一次间隔是 [0,10),剩余每次间隔 10s,合计110-120 s 左右如果还连不上 apiserver 就会放弃本次事件的发送;

  2. client-go 在发送 event 之前会先进行一系列预处理流程,如果相似 event 的聚合,效果就是新发送一个关于相同资源对象的 Reason 和 Message 都相同的 event,这时候新 event 的 count 就是这类事件发生的次数,LastTimestamp 是事件产生时间,FirstTimestamp 是第一次观察到这类事件的时间;并且快速发送多个一样的 event 满足一定条件时会被聚合成一个;

  3. client-go 中发送 event 的 burst 是 25,qps 是 1/300,意味着令牌桶大小是 25,5分钟产生一块新令牌,换言之快速发送 25 个 event 之后,5分钟内发送的 event 会被丢弃;

  4. 消息广播器的缓冲区大小是 1000,如果产生事件的速度太快,当 EventWatcher 来不及处理时,新产生的 event 也会被直接丢弃;

event

以 job 控制器中 event 的用法为例,大致步骤如下

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"})
recorder.Eventf(object, v1.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err)

从这里我们可以得到几个关键信息,首先是涉及到的几个主要对象:

  • EventBroadcaster

  • EventSink

  • EventRecorder

从名字直接猜的话,也许是这样工作的:EventRecorder 产生 events,EventBroadcaster 广播 events,EventSink,不好猜,字面意思是个事件槽,大概就是事件的一个中转站,最后通过这个 Sink 事件会流转到其他地方,比如 logger 或者 apiserver?到这里为止纯属YY…… 下面具体来看。

EventBroadcaster

EventBroadcaster 用来接收 events,然后发送到一个 EventSink、watcher 或者 log 中;先看下接口定义:

  • client-go/tools/record/event.go:113

type EventBroadcaster interface {
   // 将从 EventBroadcaster 接收到的 events 丢给 eventHandler
   StartEventWatcher(eventHandler func(*v1.Event)watch.Interface 
   // 将从 EventBroadcaster 接收到的 events 丢给 EventSink
   StartRecordingToSink(sink EventSink) watch.Interface
   // 将从 EventBroadcaster 接收到的 events 丢给指定的日志函数
   StartLogging(logf func(format string, args ...interface{})watch.Interface
   // 将从 EventBroadcaster 接收到的 events 丢给指定的结构化日志函数
   StartStructuredLogging(verbosity klog.Level) watch.Interface
   // 用于获取 EventRecorderEventRecorder 可以发送 events 给 EventBroadcaster
   NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder
   Shutdown()
}

这里注意不要找到 tools/events 包里去,两个包有很相似的代码,看起来是 tools/events 包已经过期了,目前引用的都是 tools/record 包。

EventBroadcaster 对应的实现是 eventBroadcasterImpl

  • client-go/tools/record/event.go:181

type eventBroadcasterImpl struct {
     *watch.Broadcaster
     sleepDuration time.Duration
     options       CorrelatorOptions
}

前面提到第一步就是 eventBroadcaster := record.NewBroadcaster()
 调用,我们下面看看这里的 New 过程

NewBroadcaster()

实例化一个 EventBroadcaster 的过程中会直接开启一个 goroutine 来从 Broadcaster.imcoming
 接收 Event,然后分发给所有的 Broadcaster.watchers

  • client-go/tools/record/event.go:159

func NewBroadcaster() EventBroadcaster {
    return &eventBroadcasterImpl{
        Broadcaster:   watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), // maxQueuedEvents == 1000
        sleepDuration: defaultSleepDuration, // 10s
    }
}

这里通过调用 watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull)
 来创建 Broadcaster,继续看下逻辑:

func NewLongQueueBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
   m := &Broadcaster{
      watchers:            map[int64]*broadcasterWatcher{},
      incoming:            make(chan Event, queueLength),
      stopped:             make(chan struct{}),
      watchQueueLength:    queueLength,
      fullChannelBehavior: fullChannelBehavior,
   }
   m.distributing.Add(1// wg.Add(1)
   go m.loop()
   return m
}

这里实例化了一个 Broadcaster,先看下 Broadcaster 的结构:

  • apimachinery/pkg/watch/mux.go:42

type Broadcaster struct {
   watchers     map[int64]*broadcasterWatcher
   nextWatcher  int64
   distributing sync.WaitGroup
   incoming chan Event
   stopped  chan struct{}
   watchQueueLength int
   fullChannelBehavior FullChannelBehavior
}

这里的 Event 是这个结构:

type Event struct {
   Type EventType // "ADDED" / "MODIFIED" / "DELETED" / "BOOKMARK" / "ERROR"
   // 如果 EventType 是 "ADDED"/"MODIFIED",Object 是对象的最新状态;
   // 如果 EventType 是 "DELETED",Object 是对象删除前的状态;
   // 如果 EventType 是 "BOOKMARK",Object 里只有 ResourceVersion 字段被设置,客户端会保证不会收到重复的 event 或者丢失任何一个 event
   Object runtime.Object
}

回过来看调用的 m.loop()
,loop() 方法的逻辑是从 m.incoming 接受消息,然后分发给所有的 watchers

func (m *Broadcaster) loop() {
   for event := range m.incoming { // chan Event 类型,这里的 Event apimachinery/pkg/watch.Event
      if event.Type == internalRunFunctionMarker { // 如果是 fake 类型的 Event,就直接调用其内部 Obj 的方法,下面有具体 internalRunFunctionMarker 逻辑的用途分析,在 blockQueue 方法里会讲到。
         event.Object.(functionFakeRuntimeObject)()
         continue
      }
      m.distribute(event) // 分发逻辑
   }
   m.closeAll()
   m.distributing.Done() // wg.Done()
}

继续看 distribute 逻辑,这里主要是将一个 Event 分发给所有的 watcher

func (m *Broadcaster) distribute(event Event) {
   if m.fullChannelBehavior == DropIfChannelFull { // 默认行为,如果 channel 满了不阻塞,直接丢弃消息
      for _, w := range m.watchers { // map[int64]*broadcasterWatcher
         select {
         case w.result <- event: // 将 event 丢给 broadcasterWatcher.result
         case <-w.stopped:
         default// 当 w.result 满了写不进去时直接继续下一轮循环,区别于 else 里的阻塞行为
         }
      }
   } else {
      for _, w := range m.watchers {
         select {
         case w.result <- event: // result 满了会阻塞
         case <-w.stopped:
         }
      }
   }
}

StartEventWatcher

开头提到的 job 控制器里 events 相关代码里接着两步是这样的:

eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})

这两个函数调用代码都不长,主要逻辑在内部的 StartEventWatcher 中。简单看下两个方法的定义:

// client-go/tools/record/event.go:190
func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) watch.Interface {
   return e.StartEventWatcher(
      func(e *v1.Event) { // 收到一个 Event,直接打印到日志里
         klog.V(verbosity).InfoS("Event occurred""object", klog.KRef(e.InvolvedObject.Namespace, e.InvolvedObject.Name), "kind", e.InvolvedObject.Kind, "apiVersion", e.InvolvedObject.APIVersion, "type", e.Type, "reason", e.Reason, "message", e.Message)
      })
}
// ......
// client-go/tools/record/event.go:291
func (e *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
   eventCorrelator := NewEventCorrelatorWithOptions(e.options)
   return e.StartEventWatcher(
      func(event *v1.Event) {
         recordToSink(sink, event, eventCorrelator, e.sleepDuration)
      })
}

我们来看 StartEventWatcher 逻辑,然后回过来聊这两个方法。

func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)watch.Interface {
   watcher := e.Watch() // Watch 过程下面来看
   go func() {
      defer utilruntime.HandleCrash()
      for watchEvent := range watcher.ResultChan() { // 前面 Broadcaster 就是往这里丢的 Event
         event, ok := watchEvent.Object.(*v1.Event) // Event.Object 就是具体的 corev1.Event
         if !ok {
            // This is all local, so there's no reason this should
            // ever happen.
            continue
         }
         eventHandler(event) // 丢给 handler 函数处理
      }
   }()
   return watcher
}

StartEventWatcher()
 方法的入参是一个能处理 Event 的 handler 函数,这里的 Event 是 corev1.Event,也就是我们通过 kubectl 命令具体看到的 Event 资源对象。上面有一个 Watch()
 方法的调用,我们分析下具体内容。

Watch() 方法会 new 一个 watcher,然后加到 m.watchers map 里,返回这个 watcher,这个 watcher 不会接收到历史 events,而且会阻塞到成功加入 Broadcaster 为止。比如 Broadcaster 到 incoming 队列里已经有很多 Event 了,这时候新启动一个 watcher 直接开始工作会收到老消息,下面通过 blockQueue 逻辑实现了只接收新消息的逻辑,具体看下代码:

func (m *Broadcaster) Watch() Interface {
   var w *broadcasterWatcher
   m.blockQueue(func() { // blockQueue() 下面有分析
      id := m.nextWatcher
      m.nextWatcher++
      w = &broadcasterWatcher{
         result:  make(chan Event, m.watchQueueLength),
         stopped: make(chan struct{}),
         id:      id,
         m:       m,
      }
      m.watchers[id] = w
   })
   if w == nil {
      panic("broadcaster already stopped")
   }
   return w
}

blockQueue 用来阻塞 incoming 队列用的,就是往 incoming 里加入一个 fake 的 Event,然后挂起当前 gorouting,直到这个 Event 被处理到为止

func (m *Broadcaster) blockQueue(f func()) {
   select {
   case <-m.stopped:
      return
   default:
   }
   var wg sync.WaitGroup
   wg.Add(1)
   m.incoming <- Event{
      Type: internalRunFunctionMarker, // "internal-do-function"
      Object: functionFakeRuntimeObject(func() {
         defer wg.Done() // 这个 Event 被消费后会调用到 Done()
         f() // 阻塞结束后调用
      }),
   }
   wg.Wait() // 阻塞直到上面加入到 Event 被处理完
}

StartRecordingToSink

讲完了 StartEventWatcher 的逻辑,回过头看一下 StartRecordingToSink 的具体逻辑。StartRecordingToSink 的作用是将从指定 eventBroadcaster 接收到的消息传送到给定的 sink 中去

  • client-go/tools/record/event.go:190

func (e *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
   eventCorrelator := NewEventCorrelatorWithOptions(e.options)
   return e.StartEventWatcher(
      func(event *v1.Event) {
         recordToSink(sink, event, eventCorrelator, e.sleepDuration)
      })
}

这里传给 StartEventWatcher()
 方法的 handler 函数是 recordToSink(sink, event, eventCorrelator, e.sleepDuration)
,看下这个 handler 是怎么实现的:

func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, sleepDuration time.Duration) {
   // 修改前复制一份,因为一个 event 有多个 listener
   eventCopy := *event
   event = &eventCopy
   result, err := eventCorrelator.EventCorrelate(event) // 聚合处理等,下面会提到
   if err != nil {
      utilruntime.HandleError(err)
   }
   if result.Skip {
      return
   }
   tries := 0
   for {
      // 具体执行将 event 写到 sink 的过程,这里是在 if 的条件里,所以直到成功了才会 break
      if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
         break
      }
      tries++
      if tries >= maxTriesPerEvent { // 最多重试 12 次
         klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
         break
      }
      // 第一次 sleep 随机一点,避免 apiserver 失联的时候所有 client 一起失败
      if tries == 1 {
        time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64())) // 10s * [0.0,1.0)
      } else {
         time.Sleep(sleepDuration) // 10s
      }
   }
}

recordEvent

  • client-go/tools/record/event.go:238

func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
   var newEvent *v1.Event
   var err error
   if updateExistingEvent { // 如果是更新已有的 event,则调用 Patch 方法
      newEvent, err = sink.Patch(event, patch)
   }
   // 更新可能失败,因为这个 event 可能已经被删除了
   if !updateExistingEvent || (updateExistingEvent && util.IsKeyNotFoundError(err)) {
      // 如果是新建,则需要确保 ResourceVersion 为空
      event.ResourceVersion = ""
      newEvent, err = sink.Create(event)
   }
   if err == nil {
      // 更新 eventCorrelator 状态
      eventCorrelator.UpdateState(newEvent)
      return true
   }

   // 连不上 apiserver 等原因引起的失败
   switch err.(type) {
   case *restclient.RequestConstructionError:
      // 这种情况重试也会失败,所以直接返回 true
      klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
      return true
   case *errors.StatusError: // 服务器端拒绝更新,放弃
      if errors.IsAlreadyExists(err) {
         klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
      } else {
         klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
      }
      return true
   case *errors.UnexpectedObjectError:
   default// http 传输问题,比如失联,需要重试

   }
   klog.Errorf("Unable to write event: '%#v': '%v'(may retry after sleeping)", event, err)
   return false
}

EventSink

接口定义如下

  • client-go/tools/record/event.go:47

type EventSink interface {
   Create(event *v1.Event) (*v1.Event, error)
   Update(event *v1.Event) (*v1.Event, error)
   Patch(oldEvent *v1.Event, data []byte) (*v1.Event, error)
}

实现是

type EventSinkImpl struct {   Interface EventInterface}

回到一开始的用法:eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})

可以看到 Interface 引用的是 kubeClient.CoreV1().Events("")
 ,这里的逻辑就到了 client-go 的 clientset 中了,这里的类型也就是 corev1.EventInterface
,所以上面接口的 Create、Update、Patch 也就都是通过 clientset 来实现的。

EventCorrelator

EventCorrelator 的作用是预处理所有 events,聚合频繁产生的相似的 events,将多次接受到的 events 聚合成一个等,从而降低系统压力。

上面提到一个 eventCorrelator.EventCorrelate() 调用,首先看下对象定义:

  • client-go/tools/record/events_cache.go:405

type EventCorrelator struct {
   // 过滤器
   filterFunc EventFilterFunc
   // 聚合器
   aggregator *EventAggregator
   // 观察器
   logger *eventLogger
}

EventFilterFunc

过滤器主要是限速用的,看一个具体的实现:

  • client-go/tools/record/events_cache.go:129

func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
   var record spamRecord
  // eventKey 的结构大概这样:event.Source.(Component+Host)+event.InvolvedObject.(Kind+Namespace+event+Name)+...
   eventKey := f.spamKeyFunc(event)

   f.Lock()
   defer f.Unlock()
   value, found := f.cache.Get(eventKey) // cache 是一个 LRU 缓存
   if found {
      record = value.(spamRecord)
   }

   // 没有限速器就加一个
   if record.rateLimiter == nil {
      // 默认一个 source+object 的 burst 是 25 ,qps 是 1/300(5分钟一个),也就是令牌桶初始容量是 25,然后 5 分钟才会多一个令牌进来
      record.rateLimiter = flowcontrol.NewTokenBucketPassiveRateLimiterWithClock(f.qps, f.burst, f.clock)
   }

   // 看速率是否满足要求
   filter := !record.rateLimiter.TryAccept()
   // 更新 cache
   f.cache.Add(eventKey, record)

   return filter
}

EventAggregator

聚合器的作用是将相似的 events 聚合成一个 event

聚合器定义如下

  • client-go/tools/record/events_cache.go:191

type EventAggregator struct {
   sync.RWMutex
   cache *lru.Cache
   keyFunc EventAggregatorKeyFunc
   messageFunc EventAggregatorMessageFunc
   maxEvents uint            // 当相似 event 数量超过这个最大值时就触发聚合操作,默认是 10
   maxIntervalInSeconds uint // 过了这个间隔的两个相似 event 被认为是一个新的 event,默认 10min
   clock clock.PassiveClock
}

对应的 New 函数

func NewEventAggregator(lruCacheSize int, keyFunc EventAggregatorKeyFunc, messageFunc EventAggregatorMessageFunc,
   maxEvents int, maxIntervalInSeconds int, clock clock.PassiveClock)
 *EventAggregator
 {
   return &EventAggregator{
      cache:                lru.New(lruCacheSize), // 默认 4096
      keyFunc:              keyFunc,
      messageFunc:          messageFunc,
      maxEvents:            uint(maxEvents), // 默认是 10
      maxIntervalInSeconds: uint(maxIntervalInSeconds), // 默认 600s
      clock:                clock,
   }
}

这里的 keyFunc 如下

func EventAggregatorByReasonFunc(event *v1.Event) (stringstring) {
   return strings.Join([]string{
      event.Source.Component,
      event.Source.Host,
      event.InvolvedObject.Kind,
      event.InvolvedObject.Namespace,
      event.InvolvedObject.Name,
      string(event.InvolvedObject.UID),
      event.InvolvedObject.APIVersion,
      event.Type,
      event.Reason,
      event.ReportingController,
      event.ReportingInstance,
   },
      ""), event.Message
}

messageFunc 如下

func EventAggregatorByReasonMessageFunc(event *v1.Event) string {
   return "(combined from similar events): " + event.Message
}

聚合过程在 EventAggregate()
 方法中实现

func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
   now := metav1.NewTime(e.clock.Now())
   var record aggregateRecord // 维护了所有接收到过的 event 的 key
   // 计算这个 event 的 key
   eventKey := getEventKey(newEvent)
   // 类似这样 "(combined from similar events): " + event.Message
   aggregateKey, localKey := e.keyFunc(newEvent)

   // 查询 caches 里是否有相似 events 记录
   e.Lock()
   defer e.Unlock()
   value, found := e.cache.Get(aggregateKey)
   if found {
      record = value.(aggregateRecord)
   }

   // 看向下之前的记录是否太旧了,这个事件是 10min,如果太旧了就更新。
   maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
   interval := now.Time.Sub(record.lastTimestamp.Time)
   if interval > maxInterval {
      record = aggregateRecord{localKeys: sets.NewString()}
   }

   // 新 event 写入聚合 record 里,并且放到 cache 中
   record.localKeys.Insert(localKey)
   record.lastTimestamp = now
   e.cache.Add(aggregateKey, record)

   // 如果不重复的 events 数量小于10
   if uint(record.localKeys.Len()) < e.maxEvents {
      return newEvent, eventKey
   }

   // 保证 localKeys 不增长,pop 出来一个
   record.localKeys.PopAny()

   // 返回一个聚合后的 event 和对应的“聚合 key”
   eventCopy := &v1.Event{
      ObjectMeta: metav1.ObjectMeta{
         Name:      fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
         Namespace: newEvent.Namespace,
      },
      Count:          1,
      FirstTimestamp: now,
      InvolvedObject: newEvent.InvolvedObject,
      LastTimestamp:  now,
      Message:        e.messageFunc(newEvent),
      Type:           newEvent.Type,
      Reason:         newEvent.Reason,
      Source:         newEvent.Source,
   }
   return eventCopy, aggregateKey
}

eventLogger

观察器做的事情是将一个新产生的 Event 和 LRU 缓存里的做对比,如果 key 一致,也就是两个 Event 表示的信息一样,则更新缓存;如果不一样,就加到缓存里。

  • client-go/tools/record/events_cache.go:315

type eventLogger struct {
   sync.RWMutex
   cache *lru.Cache
   clock clock.PassiveClock
}

观察器有一个 eventObserve() 方法,如果 key 是相同的,这个方法会直接更新已经存在的 event,反之增加一个新的 event

func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) {
   var (
      patch []byte
      err   error
   )
   eventCopy := *newEvent // 复制一份
   event := &eventCopy

   e.Lock()
   defer e.Unlock()

   // 检查缓存里是否有需要更新的 event,这里的 key 就是前面提到的 EventAggregatorByReasonFunc() 计算出来的 key
   lastObservation := e.lastEventObservationFromCache(key)

   // 如果发现了需要更新的 event,也就是新的 event 已经存在已经老的和其各个属性都一样,Reason、Message 等都一样,而且属于同一个对象
   if lastObservation.count > 0 {
      // update the event based on the last observation so patch will work as desired
      event.Name = lastObservation.name
      event.ResourceVersion = lastObservation.resourceVersion
      event.FirstTimestamp = lastObservation.firstTimestamp // Event 构造的时候会设置 firstTimestamp 和 lastTimestamp,这里更新了 firstTimestamp
      event.Count = int32(lastObservation.count) + 1 // 计数器加1

      eventCopy2 := *event
      eventCopy2.Count = 0
      eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(00))
      eventCopy2.Message = ""

      newData, _ := json.Marshal(event)
      oldData, _ := json.Marshal(eventCopy2)
      patch, err = strategicpatch.CreateTwoWayMergePatch(oldData, newData, event)
   }

   // 记录新观察到的 Event
   e.cache.Add(
      key,
      eventLog{
         count:           uint(event.Count),
         firstTimestamp:  event.FirstTimestamp,
         name:            event.Name,
         resourceVersion: event.ResourceVersion,
      },
   )
   return event, patch, err
}

EventCorrelator.EventCorrelate()

回到 EventCorrelate() 方法的实现上

  • client-go/tools/record/events_cache.go:510

func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
   if newEvent == nil {
      return nil, fmt.Errorf("event is nil")
   }
   aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent) // 聚合器处理
   observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey) // 观察器处理
   if c.filterFunc(observedEvent) { // 过滤器处理
      return &EventCorrelateResult{Skip: true}, nil
   }
   return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}

EventRecorder

最后两步是:

recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"})
recorder.Eventf(object, v1.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err)

这里的 NewRecorder 定义如下

func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder {
   return &recorderImpl{scheme, source, e.Broadcaster, clock.RealClock{}}
}

入参是 Scheme 和 EventSource,EventSource 结构很简单:

type EventSource struct {
   // Event 从哪个组件来的,比如:job-controller
   Component string `json:"component,omitempty" protobuf:"bytes,1,opt,name=component"`
   // Event 从哪个节点来的
   Host string `json:"host,omitempty" protobuf:"bytes,2,opt,name=host"`
}

看下 EventRecorder 的定义和实现,接口长这样:

type EventRecorder interface {
   // 这里的 object 是这个 event 相关的那个资源对象;eventtype 是 'Normal/Warning' 这类简单的字符串;reason 表示这个 event 产生的原因,message 是一个更详细的可读性好的描述信息
   Event(object runtime.Object, eventtype, reason, message string)
   // 和 Event() 类似,只是用来 Sprintf
   Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})
   // Eventf() 基础上加了一个 annotations
   AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}

对应的实现是:

type recorderImpl struct {
   scheme *runtime.Scheme
   source v1.EventSource
   *watch.Broadcaster
   clock clock.PassiveClock
}

recorderImpl.Eventf()

我们写代码时使用最多的 Eventf() 对应的就是 recorderImpl 对象的 Eventf() 方法,接下来看下 Eventf() 的具体实现。

Eventf() 方法只是简单地通过 fmt.Sprintf() 格式化字符串后调用 Event()

  • client-go/tools/record/event.go:354

func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
    recorder.generateEvent(object, nil, eventtype, reason, message)
}

func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
   recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}

接着逻辑就到了generateEvent() 方法里

func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, eventtype, reason, message string) {
   ref, err := ref.GetReference(recorder.scheme, object) // 获取 object 的 ObjectReference
   if err != nil {
      klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
      return
   }

   if !util.ValidateEventType(eventtype) { // 校验 eventtype 是 "Normal"/"Warning"
      klog.Errorf("Unsupported event type: '%v'", eventtype)
      return
   }

   event := recorder.makeEvent(ref, annotations, eventtype, reason, message) // 构建 event
   event.Source = recorder.source
   // events 操作不应该阻塞,如果 event 太多的时候直接丢弃,然后打印一个日志
   if sent := recorder.ActionOrDrop(watch.Added, event); !sent {
      klog.Errorf("unable to record event: too many queued events, dropped event %#v", event)
   }
}

这里有两个函数调用:

  • makeEvent()

  • ActionOrDrop()

makeEvent 从名字就能猜到这是构造一个 Event 对象的:

func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
   t := metav1.Time{Time: recorder.clock.Now()}
   namespace := ref.Namespace
   if namespace == "" {
      namespace = metav1.NamespaceDefault
   }
   return &v1.Event{
      ObjectMeta: metav1.ObjectMeta{
         Name:        fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()), // 名字是相关 object 的 Name + 时间戳
         Namespace:   namespace, // 和 ref 在同一个 namespace 下,如果 ref 没有 namespace 就放到 default 下
         Annotations: annotations, // 支持添加 annotations
      },
      InvolvedObject: *ref,
      Reason:         reason,
      Message:        message,
      FirstTimestamp: t,
      LastTimestamp:  t,
      Count:          1,
      Type:           eventtype,
   }
}

ActionOrDrop 的逻辑是尝试往 Broadcaster.incoming channel 中写入 Event,如果失败了就直接 Drop 掉

func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) bool {
   select {
   case m.incoming <- Event{action, obj}:
      return true
   default:
      return false
   }
}

(转载请保留本文原始链接 https://www.danielhu.cn/post/k8s/event/)




推荐阅读
  • 本文介绍了关系型数据库和NoSQL数据库的概念和特点,列举了主流的关系型数据库和NoSQL数据库,同时描述了它们在新闻、电商抢购信息和微博热点信息等场景中的应用。此外,还提供了MySQL配置文件的相关内容。 ... [详细]
  • 利用Visual Basic开发SAP接口程序初探的方法与原理
    本文介绍了利用Visual Basic开发SAP接口程序的方法与原理,以及SAP R/3系统的特点和二次开发平台ABAP的使用。通过程序接口自动读取SAP R/3的数据表或视图,在外部进行处理和利用水晶报表等工具生成符合中国人习惯的报表样式。具体介绍了RFC调用的原理和模型,并强调本文主要不讨论SAP R/3函数的开发,而是针对使用SAP的公司的非ABAP开发人员提供了初步的接口程序开发指导。 ... [详细]
  • 本文介绍了Hyperledger Fabric外部链码构建与运行的相关知识,包括在Hyperledger Fabric 2.0版本之前链码构建和运行的困难性,外部构建模式的实现原理以及外部构建和运行API的使用方法。通过本文的介绍,读者可以了解到如何利用外部构建和运行的方式来实现链码的构建和运行,并且不再受限于特定的语言和部署环境。 ... [详细]
  • 图解redis的持久化存储机制RDB和AOF的原理和优缺点
    本文通过图解的方式介绍了redis的持久化存储机制RDB和AOF的原理和优缺点。RDB是将redis内存中的数据保存为快照文件,恢复速度较快但不支持拉链式快照。AOF是将操作日志保存到磁盘,实时存储数据但恢复速度较慢。文章详细分析了两种机制的优缺点,帮助读者更好地理解redis的持久化存储策略。 ... [详细]
  • 计算机存储系统的层次结构及其优势
    本文介绍了计算机存储系统的层次结构,包括高速缓存、主存储器和辅助存储器三个层次。通过分层存储数据可以提高程序的执行效率。计算机存储系统的层次结构将各种不同存储容量、存取速度和价格的存储器有机组合成整体,形成可寻址存储空间比主存储器空间大得多的存储整体。由于辅助存储器容量大、价格低,使得整体存储系统的平均价格降低。同时,高速缓存的存取速度可以和CPU的工作速度相匹配,进一步提高程序执行效率。 ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • 本文介绍了通过ABAP开发往外网发邮件的需求,并提供了配置和代码整理的资料。其中包括了配置SAP邮件服务器的步骤和ABAP写发送邮件代码的过程。通过RZ10配置参数和icm/server_port_1的设定,可以实现向Sap User和外部邮件发送邮件的功能。希望对需要的开发人员有帮助。摘要长度:184字。 ... [详细]
  • Java在运行已编译完成的类时,是通过java虚拟机来装载和执行的,java虚拟机通过操作系统命令JAVA_HOMEbinjava–option来启 ... [详细]
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • 解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法
    本文介绍了解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法,包括检查location配置是否正确、pass_proxy是否需要加“/”等。同时,还介绍了修改nginx的error.log日志级别为debug,以便查看详细日志信息。 ... [详细]
  • 本文介绍了sqlserver云存储和本地存储的区别,云存储是将数据存储在网络上,方便查看和调用;本地存储是将数据存储在电脑磁盘上,只能在存储的电脑上查看。同时提供了几种启动sqlserver的方法。此外,还介绍了如何导出数据库的步骤和工具。 ... [详细]
  • 本文讨论了在VMWARE5.1的虚拟服务器Windows Server 2008R2上安装oracle 10g客户端时出现的问题,并提供了解决方法。错误日志显示了异常访问违例,通过分析日志中的问题帧,找到了解决问题的线索。文章详细介绍了解决方法,帮助读者顺利安装oracle 10g客户端。 ... [详细]
  • 本文介绍了使用数据库管理员用户执行onstat -l命令来监控GBase8s数据库的物理日志和逻辑日志的使用情况,并强调了对已使用的逻辑日志是否及时备份的重要性。同时提供了监控方法和注意事项。 ... [详细]
  • 移动端常用单位——rem的使用方法和注意事项
    本文介绍了移动端常用的单位rem的使用方法和注意事项,包括px、%、em、vw、vh等其他常用单位的比较。同时还介绍了如何通过JS获取视口宽度并动态调整rem的值,以适应不同设备的屏幕大小。此外,还提到了rem目前在移动端的主流地位。 ... [详细]
  • 本文介绍了一个适用于PHP应用快速接入TRX和TRC20数字资产的开发包,该开发包支持使用自有Tron区块链节点的应用场景,也支持基于Tron官方公共API服务的轻量级部署场景。提供的功能包括生成地址、验证地址、查询余额、交易转账、查询最新区块和查询交易信息等。详细信息可参考tron-php的Github地址:https://github.com/Fenguoz/tron-php。 ... [详细]
author-avatar
ThinkSNS
ThinkSNS(简称TS),一款全平台综合性社交系统,为国内外大中小企业和创业者提供社会化软件研发及技术解决方案,目前最新版本为ThinkSNS+。
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有