// don't add if we're already shutting down if q.ShuttingDown() { return }
q.metrics.retry() // immediately add things with no delay if duration <= 0 { q.Add(item) return }
select { case <-q.stopCh: // unblock if ShutDown() is called // 可以从stopCh中读出,说明关闭事件发生,直接跳过后面的添加 case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}: // 向优先队列发送waitFor } }
// waitingLoop runs until the workqueue is shutdown and // keeps a check on the list of items to be added. func(q *delayingType)waitingLoop() {
defer utilruntime.HandleCrash()
// Make a placeholder channel to use when there are no items in our list // 队列中没有元素时,这个watiloop实际上不用醒来,这里的具体做法就是创建一个永远不会有输入的chan, // select就永远睡去了 never := make(<-chan time.Time)
// Make a timer that expires when the item at the head of the waiting queue is ready // 假如,队列中第一个事件开始于4min后,而且期间新元素插入,waitloop应该4min之后再醒来执行事件 var nextReadyAtTimer clock.Timer
// 插入优先队列 funcinsert(q waitForPriorityQueue, knownEntries map[t]waitFor, entry *waitFor) { // if the entry already exists, update the time only if it would cause the item to be queued sooner // 已在队列中,并且生效时间提前,更新时间。Heap.Fix,大概相当于swim和sink吧 existing, exists := knownEntries[entry.data] if exists { if existing.readyAt.After(entry.readyAt) { existing.readyAt = entry.readyAt heap.Fix(q, existing.index) } return } heap.Push(q, entry) knownEntries[entry.data] = entry }
+--------------------------------------+ | | | +------------------+ | | Add all ready | | | elements in pq | | | by loop | | +------------------+ | | | | | +------------------+ | | set `nextReadyAt`| | | according to the | | | head of pq | | +------------------+ | | | | select on! | | | +------------------------+-----------------+-------------+ | | | | | | waitingForAdd heartbeat nextReadyAt stopCh | | | | | | +------------------+ | | | | | insert into pq | | | | | | or change val | | | | | | in pq by loop | | | | | +------------------+ | | | break! | | | | | | +------------------------+-----------------+ | | | | +--------------------------------------+ v
而误解的heartbeat,实际上是工程上的保险,时间隔为10s(maxWait),其注释写道:
// maxWait keeps a max bound on the wait time. It’s just insurance against weird things happening. Checking the queue every 10 seconds isn’t expensive and we know that we’ll never end up with an expired item sitting for more than 10 seconds.
defheapfix(heap, idx): if idx >= len(heap): raise IndexError heapq._siftdown(heap, 0, idx) # try swimming to tree top heapq._siftup(heap, idx) # try sinking to tree leaf
deftest_fix(): heap = [randint(0, 100) for _ inrange(50)] heapify(heap) idx, val = randint(0,49), randint(-20, 120) heap[idx] = val heapfix(heap, idx) pop_order = [heappop(heap) for _ inrange(len(heap))] returnall(x <= y for (x, y) inzip(pop_order, pop_order[1:]))