• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Kubernetesclient-goDeltaFIFO源码分析

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

Replace()

概述

源码版本信息

  • Project: kubernetes
  • Branch: master
  • Last commit id: d25d741c
  • Date: 2021-09-26

我们在《Kubernetes client-go 源码分析 - 开篇》里提到了自定义控制器涉及到的 client-go 组件整体工作流程,大致如下图:

DeltaFIFO 是上面的一个重要组件,今天我们来详细研究下 client-go 里 DeltaFIFO 相关代码。

Queue 接口

类似 workqueue 里的队列概念,这里也有一个队列,Queue 接口定义在 client-go/tools/cache 包中的 fifo.go 文件里,看下有哪些方法:

1type Queue interface {
2   Store
3   Pop(PopProcessFunc) (interface{}, error) // 会阻塞,知道有一个元素可以被 pop 出来,或者队列关闭
4   AddIfNotPresent(interface{}) error
5   HasSynced() bool
6   Close()
7}

这里嵌里一个 Store 接口,对应定义如下:

 1type Store interface {
2   Add(obj interface{}) error
3   Update(obj interface{}) error
4   Delete(obj interface{}) error
5   List() []interface{}
6   ListKeys() []string
7   Get(obj interface{}) (item interface{}, exists bool, err error)
8   GetByKey(key string) (item interface{}, exists bool, err error)
9   Replace([]interface{}, string) error
10   Resync() error
11}

Store 接口的方法都比较直观,Store 的实现有很多,我们等下看 Queue 里用到的是哪个实现。

Queue 接口的实现是 FIFO 和 DeltaFIFO 两个类型,我们在 Informer 里用到的是 DeltaFIFO,而 DeltaFIFO 也没有依赖 FIFO,所以下面我们直接看 DeltaFIFO 是怎么实现的。

DeltaFIFO

  • client-go/tools/cache/delta_fifo.go:97
 1type DeltaFIFO struct {
2   lock sync.RWMutex
3   cond sync.Cond
4   items map[string]Deltas
5   queue []string               // 这个 queue 里是没有重复元素的,和上面 items 的 key 保持一致
6   populated bool
7   initialPopulationCount int
8   keyFunc KeyFunc              // 用于构造上面 map 用到的 key
9   knownObjects KeyListerGetter // 用来检索所有的 keys
10   closed bool
11   emitDeltaTypeReplaced bool
12}

这里有一个 Deltas 类型,看下具体的定义:

 1type Deltas []Delta
2
3type Delta struct {
4    Type   DeltaType
5    Object interface{}
6}
7
8type DeltaType string
9
10const (
11    Added   DeltaType = "Added"
12    Updated DeltaType = "Updated"
13    Deleted DeltaType = "Deleted"
14    Replaced DeltaType = "Replaced"
15    Sync DeltaType = "Sync"
16)

可以看到 Delta 结构体保存的是 DeltaType(就是一个字符串)和发生了这种 Delta 的具体对象。

DeltaFIFO 内部主要维护的一个队列和一个 map,直观一点表示如下:

DeltaFIFO 的 New 函数是 NewDeltaFIFOWithOptions()

  • client-go/tools/cache/delta_fifo.go:218
 1func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
2   if opts.KeyFunction == nil {
3      opts.KeyFunction = MetaNamespaceKeyFunc
4   }
5
6   f := &DeltaFIFO{
7      items:        map[string]Deltas{},
8      queue:        []string{},
9      keyFunc:      opts.KeyFunction,
10      knownObjects: opts.KnownObjects,
11
12      emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
13   }
14   f.cond.L = &f.lock
15   return f
16}

元素增删改 - queueActionLocked()

可以注意到 DeltaFIFO 的 Add() 等方法等方法体都很简短,大致这样:

1func (f *DeltaFIFO) Add(obj interface{}) error {
2   f.lock.Lock()
3   defer f.lock.Unlock()
4   f.populated = true
5   return f.queueActionLocked(Added, obj)
6}

里面的逻辑就是调用 queueActionLocked() 方法传递对应的 DeltaType 进去,前面提到过 DeltaType 就是 Added、Updated、Deleted 等字符串,所以我们直接先看 queueActionLocked() 方法的实现。

  • client-go/tools/cache/delta_fifo.go:409
 1func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
2   id, err := f.KeyOf(obj) // 计算这个对象的 key
3   if err != nil {
4      return KeyError{obj, err}
5   }
6   oldDeltas := f.items[id] // 从 items map 里获取当前的 Deltas
7   newDeltas := append(oldDeltas, Delta{actionType, obj}) // 构造一个 Delta,添加到 Deltas 中,也就是 []Delta 里
8   newDeltas = dedupDeltas(newDeltas) // 如果最近个 Delta 是重复的,则保留后一个;目前版本只处理的 Deleted 重复场景
9
10   if len(newDeltas) > 0 { // 理论上 newDeltas 长度一定大于0
11      if _, exists := f.items[id]; !exists {
12         f.queue = append(f.queue, id) // 如果 id 不存在,则在队列里添加
13      }
14      f.items[id] = newDeltas // 如果 id 已经存在,则只更新 items map 里对应这个 key 的 Deltas
15      f.cond.Broadcast()
16   } else { // 理论上这里执行不到
17      if oldDeltas == nil {
18         klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
19         return nil
20      }
21      klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
22      f.items[id] = newDeltas
23      return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
24   }
25   return nil
26}

到这里再反过来看 Add() Delete() Update() Get() 等函数,就很清晰了,只是将对应变化类型的 obj 添加到队列中。

Pop()

Pop 按照元素的添加或更新顺序有序返回一个元素(Deltas),在队列为空时会阻塞。另外 Pop 过程会先从队列中删除一个元素然后返回,所以如果处理失败了需要通过 AddIfNotPresent() 方法将这个元素加回到队列中。

Pop 的参数是 type PopProcessFunc func(interface{}) error 类型的 process,中 Pop() 函数中直接将队列里的第一个元素出队,然后丢给 process 处理,如果处理失败会重新入队,但是这个 Deltas 和对应的错误信息会被返回。

  • client-go/tools/cache/delta_fifo.go:515
 1func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
2   f.lock.Lock()
3   defer f.lock.Unlock()
4   for { // 这个循环其实没有意义,和下面的 !ok 一起解决了一个不会发生的问题
5      for len(f.queue) == 0 { // 如果为空则进入这个循环
6         if f.closed { // 队列关闭则直接返回
7            return nil, ErrFIFOClosed
8         }
9         f.cond.Wait() // 等待
10      }

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Go-优点发布时间:2022-07-10
下一篇:
阿里云ECS服务器云监控(cloudmonitor)Go语言版本插件安装卸载与维护 ...发布时间:2022-07-10
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap