client-go工作队列学习(一) - 普通队列

第一篇,普通队列: 影分身的忍者队列,换个马甲照样认识你

队列作为解耦上下游强依赖关系的常见工具,为应对不同的场景,也诞生了不同的定制化。

K8s中client-go中的工作队列有三种:普通队列、延时队列、限速队列。分层依赖的关系。

每个队列我都会用单独的一篇文章记录。这个系列的主要参考是大佬的博客,会加上我自己的一些理解和补充。这些理解甚至包括,学习过程中我自己对一些设计的猜测,这些猜测甚至是错误的🙃,小心阅读。

普通队列

先看普通的队列的接口

1
2
3
4
5
6
7
8
9
10
// client-go/util/workqueue/queue.go

type Interface interface {
Add(item interface{}) // 向队列中添加一个元素,interface{}类型,说明可以添加任何类型的元素
Len() int // 元素的个数
Get() (item interface{}, shutdown bool) // 从队列中获取一个元素,第二个返回值告知队列是否已经关闭
Done(item interface{}) // 告知队列该元素已经处理完了
ShutDown() // 关闭队列
ShuttingDown() bool // 查询队列是否正在关闭
}

和传统的队列有一个不同,传统队列的接口时是push and pop,但这里pop换成了Get,Get时并不会从队列中删除该元素,而是要显示调用Done之后,才从队列中删除。

我猜这是给处理失败留下了空间。如果直接弹出,而线程处理崩溃,这个任务就永远消失了。所以要让处理逻辑显示通知队列删除该任务。

因此:

  1. 一个任务被处理逻辑认领处理,而未commit时,应该将该项任务做标记,避免其他逻辑前来认领。
  2. 还有记录超时的结构,让久未确认的任务重新回到队列。

然后看队列实现时用了什么字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// client-go/util/workqueue/queue.go
type Type struct {
queue []t // 元素数组
dirty set // dirty的元素集合
processing set // 正在处理的元素集合
cond *sync.Cond // 自带互斥锁的条件变量
shuttingDown bool // 关闭标记
metrics queueMetrics // prometheus的metrics概念相同*
}

// 以下的这些类型定义为了用map凹出一个set类型,等用于python的set和Rust的HashSet
type empty struct{} // 空类型,因为sizeof(struct{})=0
type t interface{} // 任意元素类型
type set map[t]empty // 用map实现的set

现去看了看 Prometheus 的 metrics 概念。
metrics 是时间序列数值,单点的数据模型是:
<metric_name>{label1: A, label2: B}=3.1415926 & timestamp=123443242352
可以根据 label 新建分析维度。按照时间序列上的属性变动,可以分为单调递增的 Counter,任意波动的 Gauge,按时间段统计的 Hist,Summary 等等。
所以这里的 metrics 应该是某种统计类型,调用接口来生成一个单点数据,供 Prometheus 拉取。

看这个结构,猜一下实现:

  • queue保证出入顺序

  • processing是被认领但未提交的任务

  • 条件变量来维持线程同步,比如队列为空时阻塞消费线程

  • shuttingdown标记,ShutDown将该标记值为True,之后不再接受新任务,但是可以继续被消费。

可是dirty是什么呢?没有维护超时的结构吗?

再看具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (q *Type) Add(item interface{}) {
// 条件变量自带了与之搭配的互斥锁,L
q.cond.L.Lock()

defer q.cond.L.Unlock()

// 队列正在关闭,拒绝一切新输入
if q.shuttingDown {
return
}

// 去重,dirty为queue的set,正在queue中还未处理,就丢弃
if q.dirty.has(item) {
return
}

// 告知metrics添加了元素
q.metrics.add(item)

// 添加到脏数据集合中
q.dirty.insert(item)

// 元素刚被拿走处理,那就直接返回
if q.processing.has(item) {
return
}

// 追加到元素数组的尾部,没有大小限制,是无界队列
q.queue = append(q.queue, item)

// 通知有新元素到了,此时有协程阻塞就会被唤醒
q.cond.Signal()
}

反正我是看蒙了,为什么相同元素就不入队列?每个新元素都会先进入dirty,然后用dirty来去重?

感觉这里要把自己以前的认识打碎,现在这个队列是面向应用场景的队列,需要从业务层面判断是否重复,比如这里就是K8s的的api对象。书本上的队列,都是没有业务概念的,每个push的对象都看作不同。

dirty大多数情况下,是queue的set版本,但是注意dirty.insertqueue.append并不是完全同步,中间被检查processing隔断:当对象正在被处理,期间又Add相同的对象,只会进入dirty而不会进入queue,多的这一份,会在Done时,检查dirty,补充加入queue。这个处理方式,保证的是正在处理的对象,一定不会出现在queue中。换句话说,同一个对象,不会被被两个逻辑流同时处理。普通实现这个保证,只要把dirty完全做成queue的set版本就可以,也就是把Add中的检查processing提前到dirty insert之前。这样,处理期间到来的对象会被拒绝,只有处理完成后,才能接受该对象的新版本。而当前实现,凡是在开始处理后加入的相同对象,就会被记录,并在处理完成后立即加入queue。窗口变大了,但是不知道为什么要这么做?

一个典型的过程描述如下:

命令 dirty processing queue 备注
Add 1 {1} {} [1]
Add 2 {1, 2} {} [1, 2]
Add 1 {1, 2} {} [1, 2]
GET {2} {1} [2]
Add 1 {1, 2} {1} [2] dirty中的1就是处理期间到来的新版本
Done 1 {1, 2} {} [2, 1] 从dirty中拿出处理期间得到的新对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func (q *Type) Done(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()

q.metrics.done(item)
q.processing.delete(item)

// 此处判断脏元素集合,看看处理期间是不是又被添加,如果是那就在放到队列中
if q.dirty.has(item) {
q.queue = append(q.queue, item)
q.cond.Signal()
}
}

func (q *Type) Get() (item interface{}, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()

// 没有数据,阻塞协程
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait()
}

// 协程被激活但还没有数据,说明队列被关闭了
if len(q.queue) == 0 {
return nil, true
}

// 弹出第一个元素
item, q.queue = q.queue[0], q.queue[1:]
q.metrics.get(item)

// 从dirty集合中移除,加入到processing集合
q.processing.insert(item)
q.dirty.delete(item)
return item, false
}

func (q *Type) ShutDown() {
q.cond.L.Lock()
defer q.cond.L.Unlock()

q.shuttingDown = true
// 关闭时通知全部可能阻塞的携程,起来后发现queue为0就自己退出
q.cond.Broadcast()
}

总结一下

  • 无界队列,条件变量阻塞消费者
  • 队列元素有对象身份的概念,在一个时间,同一个对象只能被一个消费者处理,一个对象,要么在processing中,要么在queue中,这个这个队列的独特性质。
  • 通过dirty和queue的差异,提前记录处理过程中出现的相同对象,处理完成后加入queue,此时processing和dirty中有重复对象
  • 最后也没有出现超时的处理。所以Get+Done的作用应该不是像我猜的那样,为了消费者崩溃,自动超时并重入队列,而就要是维持第二点的性质——如果没有调用Done,对象就一直存在于processing集合中,这个队列不会再分配其他对象。否则,没有processing的状态,就可能把相同对象分配到另外的消费者里。