client-go工作队列学习(三) - 限速队列

第三篇,限速队列: 等多久?让我找个大仙算算

限速队列

限速队列之前都没听说过,看了接口定义也不懂,直接看限速器的定义了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// client-go/util/workqueue/rate_limiting_queue.go

type RateLimitingInterface interface {

DelayingInterface // 组合延时队列
AddRateLimited(item interface{}) // 按照限速方式添加元素的接口
Forget(item interface{}) // 丢弃指定元素
NumRequeues(item interface{}) int // 查询元素放入队列的次数

}

// 限速队列的实现
type rateLimitingType struct {
DelayingInterface // 组合延迟队列
rateLimiter RateLimiter // 限速器
}

//client-go/blob/master/util/workqueue/default_rate_limiters.go
type RateLimiter interface {
When(item interface{}) time.Duration // 返回元素item需要等待多长时间
Forget(item interface{}) // 从限速器中丢弃该元素,比如元素重入队列的次数该清零
NumRequeues(item interface{}) int // 元素重入队列的次数
}

最开始不是很明白ForgetNumRequeues的作用,但是根据When和限速的语义,是这么猜的,限速队列的入队列接口是AddRateLimitedWhen自动计算延时时间duration,再安排调用AddAfter(item,duration)。比如说现在一秒内调用了10次AddRateLimited,但是可以通过限速器,把对象真正加入队列的时间分配到10s上。AddAfter(obj0, 0), AddAfter(obj1, 1), AddAfter(obj2, 2)……这样就把每秒10次限速到了1秒1次。

没错,限速队列的实现上就是调用AddAfterduration的计算靠限速器实现。

1
2
3
4
5
6
7
8
9
10
11
12
func (q *rateLimitingType) AddRateLimited(item interface{}) {
// 通过限速器获取延迟时间,然后加入到延时队列
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}

func (q *rateLimitingType) NumRequeues(item interface{}) int {
return q.rateLimiter.NumRequeues(item)
}

func (q *rateLimitingType) Forget(item interface{}) {
q.rateLimiter.Forget(item)
}

所以现在重点是猜一下限速器的实现?具体实现,大概会把每秒钟看成一个盒子,只能放一个Add事件。基于这个思路用Python写了一版:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import time


class BucketRateLimiter:
def __init__(self, period, quota):
# capacity for every bucket
self.b_quota = quota
# current bucket start postion
self.b_pos = self.tick()
# current bucket span
self.b_span = period
# how many items in current bucket
self.b_cnt = 0

def tick(self):
# return fractional seconds
return time.monotonic()

def inc_current_bucket(self):
self.b_cnt += 1
if self.b_cnt >= self.b_quota:
# use next bucket
self.new_bucket_at(self.b_pos+self.b_span)

def new_bucket_at(self, pos):
self.b_pos = pos
self.b_cnt = 0

def when(self, _obj):
now = time.monotonic()
# at any time, we ensure that the current bucket is not full
# so we can calculate `delay` first
delay = max(0, self.b_pos - now)
if now > self.b_pos + self.b_span:
self.new_bucket_at(now)
self.inc_current_bucket()
else:
self.inc_current_bucket()
return delay

def forget(self, _obj):
pass

def num_requeues(self, _obj):
pass


if __name__ == "__main__":
rate_limiter = BucketRateLimiter(1, 4)
for _ in range(10):
print(rate_limiter.when(None))

看上去效果还行。

思路是维护period长的bucket窗口,保证当前窗口永远不满,when时计算当前时间和窗口左边缘的距离,得到等待的时间。然后根据当前时间和窗口的位置关系,往窗口添加事件或者新建窗口再添加事件。时间用的monotonic,符合这种只关注时间跨度的场景,而pref_counter精度更高,且忽略线程休眠的时间,没有必要。

但是重入队列该怎么理解?NumRequeuesForget有什么用?我是这么猜的,NumRequeues是个只读接口,修改它,必然是靠When来修改。所以限速器内部会有一个字典结构,对每个对象When一下,就把该对象的NumRequeues+1,重入队列就是字面意义上的对相同对象调用多次AddRateLimited,这个动作就是重试,这是用户发起的,并不是限速队列帮你重试。NumRequeues的值越大,限速器就会把该对象入队列的时间越往后延,等于说限速器帮你实现重试时间backoff的逻辑。而Forget就是忘记该对象的重试历史,重置backoff的时间序列。

共有如下几个限速器的具体实现:

  • BucketRateLimiter
  • ItemBucketRateLimiter
  • ItemExponentialFailureRateLimiter
  • ItemFastSlowRateLimiter
  • MaxOfRateLimiter

前三个,看名字也差不多能猜到是什么了。具体说明:

限速器 说明
BucketRateLimiter 无视对象身份的速率限制(就是我的Python版本)
ItemBucketRateLimiter 对每个对象使用一个BucketRateLimiter,限制每个对象的插入速率
ItemExponentialFailureRateLimiter 对每个重试项的指数型backoff控制,backoff = min(base * 2 ^ times, max_delay),实现上要注意防止指数计算溢出
ItemFastSlowRateLimiter 对每个重试项的阶跃型backoff控制,backoff = fast_delay if times < n else slow_delay
MaxOfRateLimiter 聚合类型,包装其他Limiter,从中取最长的等待时间,最大重试次数

其中BucketRateLimiter是基于 “golang.org/x/time/rate“ 实现的。

看了一下,使用的是名为令牌桶的方案。思路是维护一个容量为quota的桶,每秒往桶里放入n个token,如果满了就不放置。消费者从桶里拿token,拿到表示允许事件发生,拿不到就阻塞或者返回需要等待的时间。

关键动作是:

ReserveN(time time.Time, n int)

表示从time时间点(一般传入当前时间)预定n个token,需要等待多长时间。

n > quota等待时间就是无限长。返回的是一个名为Reservation的结构r,r.Delay()可查询等待的时间,ok查询是否n > quota。该结构一返回,就表示n个token被消费的事实。

ReserveN的基础上,就可以派生出其他行为

接口 等效 用途
Reserve() ReserveN(now, 1) 得到时间,自由调度
Allow() AllowN(now, 1) = Reserve(now, 1).ok 用于高峰直接丢弃过多的请求
Wait(context) WaitN(context, 1) = 阻塞时长Reserve().Delay() 主动等待,尽量不丢失请求

所以使用rate提供的limiter来实现BucketRateLimiter,就是在When时调用Reserve().Delay(),源码就是这样搞的:

1
2
3
func (r *BucketRateLimiter) When(item interface{}) time.Duration {
return r.Limiter.Reserve().Delay()
}

总结一下

  • 限速队列,就是通过把操作延时到未来发生,控制后得到某条事件发生曲线,平稳限速或者实现重试时间backoff的指数增长

  • 我猜的限时器实现使用的滑动窗口,go中rate的方案果然更工程化,接口更丰富,更灵活,适用场景更多

至此,三个队列都学习完了,对我这种菜鸡,收获感还是很足

普通队列 (保证相同对象不会被不同消费者处理、Get+Done的接口组合) ->

延时队列 (引入优先队列,高效地协程唤醒策略,基于Add实现AddAfter) ->

限速队列 (引入限速器,自动分配延长时间,基于AddAfter实现AddRateLimit)

分层的设计,复用下层实现,附加电池,创造了更多功能,不愧是你啊阿谷😍