client-go工作队列学习(一) - 普通队列
第一篇,普通队列: 影分身的忍者队列,换个马甲照样认识你
队列作为解耦上下游强依赖关系的常见工具,为应对不同的场景,也诞生了不同的定制化。
K8s中client-go中的工作队列有三种:普通队列、延时队列、限速队列。分层依赖的关系。
每个队列我都会用单独的一篇文章记录。这个系列的主要参考是大佬的博客,会加上我自己的一些理解和补充。这些理解甚至包括,学习过程中我自己对一些设计的猜测,这些猜测甚至是错误的🙃,小心阅读。
普通队列
先看普通的队列的接口
1 |
|
和传统的队列有一个不同,传统队列的接口时是push and pop,但这里pop换成了Get,Get时并不会从队列中删除该元素,而是要显示调用Done之后,才从队列中删除。
我猜这是给处理失败留下了空间。如果直接弹出,而线程处理崩溃,这个任务就永远消失了。所以要让处理逻辑显示通知队列删除该任务。
因此:
- 一个任务被处理逻辑认领处理,而未commit时,应该将该项任务做标记,避免其他逻辑前来认领。
- 还有记录超时的结构,让久未确认的任务重新回到队列。
然后看队列实现时用了什么字段:
1 |
|
现去看了看 Prometheus 的 metrics 概念。
metrics 是时间序列数值,单点的数据模型是:<metric_name>{label1: A, label2: B}=3.1415926 & timestamp=123443242352
可以根据 label 新建分析维度。按照时间序列上的属性变动,可以分为单调递增的 Counter,任意波动的 Gauge,按时间段统计的 Hist,Summary 等等。
所以这里的 metrics 应该是某种统计类型,调用接口来生成一个单点数据,供 Prometheus 拉取。
看这个结构,猜一下实现:
queue保证出入顺序
processing是被认领但未提交的任务
条件变量来维持线程同步,比如队列为空时阻塞消费线程
shuttingdown标记,ShutDown将该标记值为True,之后不再接受新任务,但是可以继续被消费。
可是dirty是什么呢?没有维护超时的结构吗?
再看具体实现:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34func (q *Type) Add(item interface{}) {
// 条件变量自带了与之搭配的互斥锁,L
q.cond.L.Lock()
defer q.cond.L.Unlock()
// 队列正在关闭,拒绝一切新输入
if q.shuttingDown {
return
}
// 去重,dirty为queue的set,正在queue中还未处理,就丢弃
if q.dirty.has(item) {
return
}
// 告知metrics添加了元素
q.metrics.add(item)
// 添加到脏数据集合中
q.dirty.insert(item)
// 元素刚被拿走处理,那就直接返回
if q.processing.has(item) {
return
}
// 追加到元素数组的尾部,没有大小限制,是无界队列
q.queue = append(q.queue, item)
// 通知有新元素到了,此时有协程阻塞就会被唤醒
q.cond.Signal()
}
反正我是看蒙了,为什么相同元素就不入队列?每个新元素都会先进入dirty,然后用dirty来去重?
感觉这里要把自己以前的认识打碎,现在这个队列是面向应用场景的队列,需要从业务层面判断是否重复,比如这里就是K8s的的api对象。书本上的队列,都是没有业务概念的,每个push的对象都看作不同。
dirty大多数情况下,是queue的set版本,但是注意dirty.insert
和queue.append
并不是完全同步,中间被检查processing隔断:当对象正在被处理,期间又Add相同的对象,只会进入dirty而不会进入queue,多的这一份,会在Done时,检查dirty,补充加入queue。这个处理方式,保证的是正在处理的对象,一定不会出现在queue中。换句话说,同一个对象,不会被被两个逻辑流同时处理。普通实现这个保证,只要把dirty完全做成queue的set版本就可以,也就是把Add中的检查processing提前到dirty insert之前。这样,处理期间到来的对象会被拒绝,只有处理完成后,才能接受该对象的新版本。而当前实现,凡是在开始处理后加入的相同对象,就会被记录,并在处理完成后立即加入queue。窗口变大了,但是不知道为什么要这么做?
一个典型的过程描述如下:
命令 | dirty | processing | queue | 备注 |
---|---|---|---|---|
Add 1 | {1} | {} | [1] | |
Add 2 | {1, 2} | {} | [1, 2] | |
Add 1 | {1, 2} | {} | [1, 2] | |
GET | {2} | {1} | [2] | |
Add 1 | {1, 2} | {1} | [2] | dirty中的1就是处理期间到来的新版本 |
Done 1 | {1, 2} | {} | [2, 1] | 从dirty中拿出处理期间得到的新对象 |
1 |
|
总结一下
- 无界队列,条件变量阻塞消费者
- 队列元素有对象身份的概念,在一个时间,同一个对象只能被一个消费者处理,一个对象,要么在processing中,要么在queue中,这个这个队列的独特性质。
- 通过dirty和queue的差异,提前记录处理过程中出现的相同对象,处理完成后加入queue,此时processing和dirty中有重复对象
- 最后也没有出现超时的处理。所以Get+Done的作用应该不是像我猜的那样,为了消费者崩溃,自动超时并重入队列,而就要是维持第二点的性质——如果没有调用Done,对象就一直存在于processing集合中,这个队列不会再分配其他对象。否则,没有processing的状态,就可能把相同对象分配到另外的消费者里。
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!