client-go工作队列学习(三) - 限速队列
第三篇,限速队列: 等多久?让我找个大仙算算
限速队列
限速队列之前都没听说过,看了接口定义也不懂,直接看限速器的定义了
1 |
|
最开始不是很明白Forget
和NumRequeues
的作用,但是根据When
和限速的语义,是这么猜的,限速队列的入队列接口是AddRateLimited
,When
自动计算延时时间duration
,再安排调用AddAfter(item,duration)
。比如说现在一秒内调用了10次AddRateLimited
,但是可以通过限速器,把对象真正加入队列的时间分配到10s上。AddAfter(obj0, 0)
, AddAfter(obj1, 1)
, AddAfter(obj2, 2)
……这样就把每秒10次限速到了1秒1次。
没错,限速队列的实现上就是调用AddAfter
。duration
的计算靠限速器实现。
1 |
|
所以现在重点是猜一下限速器的实现?具体实现,大概会把每秒钟看成一个盒子,只能放一个Add事件。基于这个思路用Python写了一版:
1 |
|
看上去效果还行。
思路是维护period
长的bucket
窗口,保证当前窗口永远不满,when
时计算当前时间和窗口左边缘的距离,得到等待的时间。然后根据当前时间和窗口的位置关系,往窗口添加事件或者新建窗口再添加事件。时间用的monotonic
,符合这种只关注时间跨度的场景,而pref_counter
精度更高,且忽略线程休眠的时间,没有必要。
但是重入队列该怎么理解?NumRequeues
和Forget
有什么用?我是这么猜的,NumRequeues
是个只读接口,修改它,必然是靠When
来修改。所以限速器内部会有一个字典结构,对每个对象When
一下,就把该对象的NumRequeues+1
,重入队列就是字面意义上的对相同对象调用多次AddRateLimited
,这个动作就是重试,这是用户发起的,并不是限速队列帮你重试。NumRequeues
的值越大,限速器就会把该对象入队列的时间越往后延,等于说限速器帮你实现重试时间backoff的逻辑。而Forget
就是忘记该对象的重试历史,重置backoff的时间序列。
共有如下几个限速器的具体实现:
- BucketRateLimiter
- ItemBucketRateLimiter
- ItemExponentialFailureRateLimiter
- ItemFastSlowRateLimiter
- MaxOfRateLimiter
前三个,看名字也差不多能猜到是什么了。具体说明:
限速器 | 说明 |
---|---|
BucketRateLimiter | 无视对象身份的速率限制(就是我的Python版本) |
ItemBucketRateLimiter | 对每个对象使用一个BucketRateLimiter,限制每个对象的插入速率 |
ItemExponentialFailureRateLimiter | 对每个重试项的指数型backoff控制,backoff = min(base * 2 ^ times, max_delay),实现上要注意防止指数计算溢出 |
ItemFastSlowRateLimiter | 对每个重试项的阶跃型backoff控制,backoff = fast_delay if times < n else slow_delay |
MaxOfRateLimiter | 聚合类型,包装其他Limiter,从中取最长的等待时间,最大重试次数 |
其中BucketRateLimiter是基于 “golang.org/x/time/rate“ 实现的。
看了一下,使用的是名为令牌桶的方案。思路是维护一个容量为quota的桶,每秒往桶里放入n个token,如果满了就不放置。消费者从桶里拿token,拿到表示允许事件发生,拿不到就阻塞或者返回需要等待的时间。
关键动作是:
ReserveN(time time.Time, n int)
表示从time时间点(一般传入当前时间)预定n个token,需要等待多长时间。
n > quota
等待时间就是无限长。返回的是一个名为Reservation
的结构r,r.Delay()可查询等待的时间,ok查询是否n > quota
。该结构一返回,就表示n个token被消费的事实。
在ReserveN
的基础上,就可以派生出其他行为
接口 | 等效 | 用途 |
---|---|---|
Reserve() | ReserveN(now, 1) | 得到时间,自由调度 |
Allow() | AllowN(now, 1) = Reserve(now, 1).ok | 用于高峰直接丢弃过多的请求 |
Wait(context) | WaitN(context, 1) = 阻塞时长Reserve().Delay() | 主动等待,尽量不丢失请求 |
所以使用rate提供的limiter来实现BucketRateLimiter
,就是在When
时调用Reserve().Delay()
,源码就是这样搞的:
1 |
|
总结一下
限速队列,就是通过把操作延时到未来发生,控制后得到某条事件发生曲线,平稳限速或者实现重试时间backoff的指数增长
我猜的限时器实现使用的滑动窗口,go中rate的方案果然更工程化,接口更丰富,更灵活,适用场景更多
至此,三个队列都学习完了,对我这种菜鸡,收获感还是很足
普通队列 (保证相同对象不会被不同消费者处理、Get+Done的接口组合) ->
延时队列 (引入优先队列,高效地协程唤醒策略,基于Add实现AddAfter) ->
限速队列 (引入限速器,自动分配延长时间,基于AddAfter实现AddRateLimit)
分层的设计,复用下层实现,附加电池,创造了更多功能,不愧是你啊阿谷😍
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!