client-go工作队列学习(二) - 延时队列

第二篇,延时队列: 想进去排队啊?等着

延时队列

1
2
3
4
type DelayingInterface interface {
Interface // 继承了通用队列所有接口
AddAfter(item interface{}, duration time.Duration) // 增加了延迟添加的接口
}

从接口定义看,猜测是元素在duration之后的再加入队列,那维护一个优先队列,设定线程timer,轮询队列头元素是否满足加入队列的绝对时间条件,满足就直接调用通用队列的Add?哎呀终于猜对了一把方向。

下面是实现用的结构:

1
2
3
4
5
6
7
8
9
type delayingType struct {
Interface // 组合通用队列实现
clock clock.Clock // 时钟,用于获取时间
stopCh chan struct{} // 优先队列的协程需要退出信号
stopOnce sync.Once // 向优先队列协程发送关闭信号只能发一次
heartbeat clock.Ticker // 定时器,定时唤醒处理协程去看队列头是否满足插入时间
waitingForAddCh chan *waitFor // 所有延迟添加的元素封装成waitFor放到chan中
metrics retryMetrics // 和通用队列中的metrics功能类似
}

waitFor就是要放入优先队列中的元素结构

1
2
3
4
5
type waitFor struct {
data t // 元素数据,这个t就是在通用队列中定义的类型interface{}
readyAt time.Time // 插入动作生效的绝对时间
index int // 指示这个元素在优先队列中的位置,用来调整在队列中的值
}

虽然允许多次调用关闭,但在stopOnce的作用下只会关闭一次。

1
2
3
4
5
6
7
8
9
10
// 只关闭一次

func (q *delayingType) ShutDown() {
q.stopOnce.Do(func() {
q.Interface.ShutDown()
close(q.stopCh)
q.heartbeat.Stop()
})
}

AddAfter长这样,逻辑足够简单了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {

// don't add if we're already shutting down
if q.ShuttingDown() {
return
}

q.metrics.retry()
// immediately add things with no delay
if duration <= 0 {
q.Add(item)
return
}

select {
case <-q.stopCh:
// unblock if ShutDown() is called
// 可以从stopCh中读出,说明关闭事件发生,直接跳过后面的添加
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
// 向优先队列发送waitFor
}
}

下面是关于优先队列的逻辑,还是有太多我想不到的优化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// waitingLoop runs until the workqueue is shutdown and
// keeps a check on the list of items to be added.
func (q *delayingType) waitingLoop() {

defer utilruntime.HandleCrash()

// Make a placeholder channel to use when there are no items in our list
// 队列中没有元素时,这个watiloop实际上不用醒来,这里的具体做法就是创建一个永远不会有输入的chan,
// select就永远睡去了
never := make(<-chan time.Time)

// Make a timer that expires when the item at the head of the waiting queue is ready
// 假如,队列中第一个事件开始于4min后,而且期间新元素插入,waitloop应该4min之后再醒来执行事件
var nextReadyAtTimer clock.Timer

// 初始化优先队列
waitingForQueue := &waitForPriorityQueue{}
heap.Init(waitingForQueue)


// t是对象,队列要维护对象的身份,相同对象插入时,仅更新时间,不要重复插入
waitingEntryByData := map[t]*waitFor{}

for {
if q.Interface.ShuttingDown() {
return
}

now := q.clock.Now()

// Add ready entries
// 醒来了,开始干活,队列中有对象
for waitingForQueue.Len() > 0 {
entry := waitingForQueue.Peek().(*waitFor)
if entry.readyAt.After(now) {
break // 后面的对象都不会生效了,跳出处理
}

// 满足生效时间,弹出对象,插入下层队列
entry = heap.Pop(waitingForQueue).(*waitFor)
q.Add(entry.data)
// 去重用的map中删除
delete(waitingEntryByData, entry.data)
}

// Set up a wait for the first item's readyAt (if one exists)
// 经过上步,生效的对象都插入队列了,看距离下一次生效要多长时间,先假设队列已空,无限长
nextReadyAt := never

if waitingForQueue.Len() > 0 {
if nextReadyAtTimer != nil {
// 新插入的元素生效时间可能更短,取消之前的timer
nextReadyAtTimer.Stop()
}

entry := waitingForQueue.Peek().(*waitFor)
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
nextReadyAt = nextReadyAtTimer.C()
}

select {

case <-q.stopCh:
return

case <-q.heartbeat.C():
// continue the loop, which will add ready items
// 理论上这个没用,保险一下,跳出select,去到上面的处理流程,尝试去添加生效的事件

case <-nextReadyAt:
// continue the loop, which will add ready items
// 之前设置的队列头部事件设里的timer,此时第一个事件应该生效了

case waitEntry := <-q.waitingForAddCh:

// AddAfter被调用啦,新加入元素
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}


// 把chan中可用的entry一次性处理完
drained := false
for !drained {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
default:
drained = true
}
}
}
}
}


// 插入优先队列
func insert(q waitForPriorityQueue, knownEntries map[t]waitFor, entry *waitFor) {
// if the entry already exists, update the time only if it would cause the item to be queued sooner
// 已在队列中,并且生效时间提前,更新时间。Heap.Fix,大概相当于swim和sink吧
existing, exists := knownEntries[entry.data]
if exists {
if existing.readyAt.After(entry.readyAt) {
existing.readyAt = entry.readyAt
heap.Fix(q, existing.index)
}
return
}
heap.Push(q, entry)
knownEntries[entry.data] = entry
}

总结一下

  • 优先队列的检查,并不是依靠无差别的heartbeat做的。每次插入时会检查,没有插入时,检查队列头,确定最近事件的发生,调整timer时机,避免loop无意义地醒来,性能更好。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    +--------------------------------------+
    | |
    | +------------------+
    | | Add all ready |
    | | elements in pq |
    | | by loop |
    | +------------------+
    | |
    | |
    | +------------------+
    | | set `nextReadyAt`|
    | | according to the |
    | | head of pq |
    | +------------------+
    | |
    | | select on!
    | |
    | +------------------------+-----------------+-------------+
    | | | | |
    | waitingForAdd heartbeat nextReadyAt stopCh
    | | | | |
    | +------------------+ | | |
    | | insert into pq | | | |
    | | or change val | | | |
    | | in pq by loop | | | |
    | +------------------+ | | | break!
    | | | | |
    | +------------------------+-----------------+ |
    | | |
    +--------------------------------------+ v
  • 而误解的heartbeat,实际上是工程上的保险,时间隔为10s(maxWait),其注释写道:

    // maxWait keeps a max bound on the wait time. It’s just insurance against weird things happening. Checking the queue every 10 seconds isn’t expensive and we know that we’ll never end up with an expired item sitting for more than 10 seconds.

  • 队列元素的修改,重排,Go可以使用Fix,这也太实诚了。虽然大家都是通过数组实现的优先队列,但是学院派一点,都说自己不保证底层的实现方式,你只能用push和pop接口,不支持修改,支持修改的应该是额外的数据结构。Python里面没有对应的接口,但是可以用内部函数冒充一下,而Rust中也有siftup等,但是私有方法用不了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def heapfix(heap, idx):
if idx >= len(heap):
raise IndexError
heapq._siftdown(heap, 0, idx) # try swimming to tree top
heapq._siftup(heap, idx) # try sinking to tree leaf

def test_fix():
heap = [randint(0, 100) for _ in range(50)]
heapify(heap)
idx, val = randint(0,49), randint(-20, 120)
heap[idx] = val
heapfix(heap, idx)
pop_order = [heappop(heap) for _ in range(len(heap))]
return all(x <= y for (x, y) in zip(pop_order, pop_order[1:]))