常见限流方法总结

2023-02-01 0 885

序言

开闭的其原因就不多说了,一句话,为的是避免服务项目被高QPS击垮。在听第四组撷取A服务项目的这时候,看见我们探讨开闭的思路,当中说到副本桶,又说到了漏桶,以后只介绍副本桶的价值观,漏桶没是不是介绍过,趁此次把开闭方式归纳下,因此把同时实现技术细节通通介绍,称得上两个小的自学。此次主要就说的开闭方式主要包括单纯开闭、漏桶、副本桶。

单纯开闭

说下他们开闭的路子,允诺的速率他们这儿主要是以每秒钟处置允诺数目QPS为依据,简而言之开闭是我的USB承若的QPS最低每秒钟处置1W条允诺,当允诺少于1W的这时候,他们要把低于1W的允诺给婉拒掉,只处置那当中1W条。他们先来看一看单纯的开闭怎样做呢?话不多说,看标识符

// RateLimiter 速率限制器

type RateLimiter struct { rate uint64 //开闭的速率 allowance uint64 //桶余下的耗电量 max uint64 //桶最小耗电量 unit uint64 //一场允诺的模块天数片 lastCheck uint64 //那次初始化的天数点 }

看一看建立速率限制器的标识符

// New 建立RateLimiter示例 func New(rate int, per time.Duration) *RateLimiter { nano := uint64(per) if nano < 1 { nano = uint64(time.Second) } if rate < 1 { rate = 1 } return &RateLimiter{ rate: uint64(rate), allowance: uint64(rate) * nano, max: uint64(rate) * nano, unit: nano, lastCheck: unixNano(), } }

分析下标识符,函数传进来的rate参数是限定处置的最小允诺速率(两个天数周期内),per指的是两个天数周期的长度,比如rate为100,per为2*time.Second 则代表,他们的最小速率是2秒钟处置100次允诺。nano是把传进来的per转成了int64的整数,看起来是精确到了纳秒级别。allowance初始化为rate*nano好理解,因为刚开始桶是空的。

ok,他们来看这个开闭器是怎样使用

func main() { // rate limit: simple rl := simpleratelimit.New(10, time.Second) for i := 0; i < 100; i++ { log.Printf(“limit result: %v\n”, rl.Limit()) } }

很单纯,他们建立两个每秒钟钟最低10次的开闭器,然后接着初始化开闭器的Limit()函数,来判定此次允诺是否需要被限制,返回true则婉拒,返回false则处置。ok,他们来看下Limit函数

// Limit 判断是否少于限制 func (rl *RateLimiter) Limit() bool { now := unixNano() passed := now atomic.SwapUint64(&rl.lastCheck, now) rate := atomic.LoadUint64(&rl.rate) current := atomic.AddUint64(&rl.allowance, passed*rate) if max := atomic.LoadUint64(&rl.max); current > max { atomic.AddUint64(&rl.allowance, maxcurrent) current = max } if current < rl.unit { return true } // 没少于限额 atomic.AddUint64(&rl.allowance, rl.unit) return false }

分析下过程:

1.首先now为当前天数点,passed为当前天数减去那次允诺的天数点,即为距离那次允诺过去天数,注意SwapUint64顺带更新了lastCheck为当前天数。

2.接着读取下速率rate

3.然后计算current=passed*rate,这儿很有意思,刚开始还没看懂,实际上你理解下,passed是距离那次的天数,rate是速率,相乘代表什么?等会解释。

4.往下看有两个if判断,先读取桶的最小耗电量,然后比较current与max的大小,当大于max的这时候,更新allowance,他们知道这这时候实际上current是等于更新allowance的,更新的方式

atomic.AddUint64(&rl.allowance, maxcurrent)

这不是把allowance更新为max了吗?然后把current=max。

到这儿,基本已经明白了,上一步的

current := atomic.AddUint64(&rl.allowance, passed*rate)

实际上在干什么,其实是在往桶里放水的过程,这个水你可以认为是一段天数(实际天数乘以速率rate),current是放进去这段天数之后的当前桶的耗电量。而这一步是为的是判断放进去的水是否少于桶的最小耗电量,当少于了,则更正为max。

5.接下来,判断当前耗电量current是否低于rl.unit,rl.unit是什么,不是一场允诺需要的天数吗?如果当前耗电量根本不够一场的允诺,必然需要给开闭的,所以返回true

6.如果current够一场允诺,则更显当前的配额,自然是减去一场允诺的天数。

最后放出余下的小部分标识符

// UpdateRate 更新速率值func (rl *RateLimiter) UpdateRate(rate int) { atomic.StoreUint64(&rl.rate, uint64(rate)) atomic.StoreUint64(&rl.max, uint64(rate)*rl.unit) } // Undo 重置上一场初始化Limit(),返回没使用过的限额 func (rl *RateLimiter) Undo() { current := atomic.AddUint64(&rl.allowance, rl.unit) if max := atomic.LoadUint64(&rl.max); current > max { atomic.AddUint64(&rl.allowance, maxcurrent) } } // unixNano 当前天数(纳秒) func unixNano() uint64 { return uint64(time.Now().UnixNano()) }

UpdateRate当然是更新rate了,不多说。Undo比较有意思了,把最近那次初始化使用的水又给放回去了,什么情况下会用到呢?假设你此次允诺发现使用方初始化的参数出错,实际上不占用你的下游QPS,你可以再把用的水放回去嘛。

接下来,他们思考下:

1.实际上这个同时实现方式的哲学价值观是什么呢?我每次桶里放水是允诺触发的,并不是他们异步的往桶里防水,会不会放水不及时呢?不会的,因为他们是先往桶里放水再判断是否开闭的。

2.为什么要把max设置为他们的rate*nano?他们不能把桶的最小耗电量设置的更大些吗?假设我把桶设置为100*nano会出现什么问题呢?

当桶初始化的这时候,桶的耗电量可以允许100次的允诺,假设瞬间过来了100个允诺,根据标识符是不会限额的,好吧看来rate主要是在限制桶的最小耗电量来达到速率的开闭。

3.你看Limit里面对开闭器的操作并没加锁,实际应用中他们的允诺必然是高并发的,这种情况会出现问题吗?

我认为有问题,比如他们再放入水更新allowance的这时候,

if max := atomic.LoadUint64(&rl.max); current > max { atomic.AddUint64(&rl.allowance, maxcurrent) current = max }

如果不做加锁,必然会可能造成两个协程先后判定if为true,接着先后都进入更新allowance,而两个都减去max-current会使得桶的耗电量小于max。

所以最好对桶的这块的操作加上锁,另外Undo的更新allowance同样需要,有必要对整个函数开始加锁,函数结束解锁吗?我觉得不必要,技术细节读者可以自行思考。

漏桶

漏桶又是什么价值观呢?单纯来说一句话,是你入水(允诺的速率)的速率无简而言之多大,我桶漏出水(处置)的速率是固定的,相信我们在网上也看见过很多漏桶的原理图,不再啰嗦,他们直接上标识符,以下标识符是Uber的开源库。

首先定义两个Limiter的USB

type Limiter interface { // Take should block to make sure that the RPS is met. Take() time.Time }

还有两个clock的USB,先放出来,稍后解释

type Clock interface { Now() time.Time Sleep(time.Duration) }

然后他们给出它的开闭器的结构体

type limiter struct { sync.Mutex last time.Time //那次允诺的天数 sleepFor time.Duration //此次要允诺要sleep的天数 perRequest time.Duration //根据限定的速率得出的每次允诺分得的天数片 maxSlack time.Duration //稍后解释 clock Clock //时钟(稍后解释) }

ok,他们来看下建立两个开闭器的函数

func New(rate int, opts Option) Limiter { l := &limiter{ perRequest: time.Second / time.Duration(rate), maxSlack: 10 * time.Second / time.Duration(rate), } for _, opt := range opts { opt(l) } if l.clock == nil { l.clock = clock.New() } return l }

分析下过程:

1.首先他们的perRequest等于1秒钟所包含的纳秒数/他们设定的速率rate,这个好理解

2.maxSlack为-10乘以perRequest,这是为什么呢?他们稍后说。

3.接着,函数传进来了两个Option类型的数组,貌似Option还是函数类型,他们循环初始化这些函数。

type Option func(l *limiter)

嗯,这些函数应该是给开闭器进行配置的。

4.然后建立了两个clock对象,他们稍后看下这个clock的用处是什么。

接下来,他们看一看它的Take函数

func (t *limiter) Take() time.Time { t.Lock() defer t.Unlock() now := t.clock.Now() if t.last.IsZero() { t.last = now return t.last } t.sleepFor += t.perRequest now.Sub(t.last) if t.sleepFor < t.maxSlack { t.sleepFor = t.maxSlack } if t.sleepFor > 0 { t.clock.Sleep(t.sleepFor) t.last = now.Add(t.sleepFor) t.sleepFor = 0 } else { t.last = now } return t.last }

分析下:

1.它加锁了,看起来符合他们的预期

2.首先读取t.clock.Now,ok他们还是来看一看这个clock到底是什么玩意

type clock struct{} func New() Clock { return &clock{} } func (c *clock) After(d time.Duration) <-chan time.Time { return time.After(d) } func (c *clock) AfterFunc(d time.Duration, f func()) { // TODO maybe return timer interface time.AfterFunc(d, f) } func (c *clock) Now() time.Time { return time.Now() } func (c *clock) Sleep(d time.Duration) { time.Sleep(d) }

t.clock.Now不是time.Now()嘛,当前天数点,很好奇干嘛要包一层

3.接下来有两个if判断,t.last.IsZero() 如果为true的话,则把更新t.last为now,然后直接返回t.last,他们先看下IsZero函数

func (t Time) IsZero() bool { return t.sec == 0 && t.nsec == 0 }

sec和nsec,这个time的天数点在公元第1年的00:00:00啊,注意看上面初始化他们并没初始化last,last是time.Time类型,默认初始化为0嘛,可不就IsZero()为true,为的是不放心,我写了个小测试标识符验证了下,当使用默认初始化time.Time的这时候,IsZero()确实为true。

同时,这也符合Uber官方给这段标识符的解释,说当第一场允诺的这时候直接返回。

4.接着更新了两个sleepFor,看起来好像是他们要睡多长天数的意思,看下它的计算方式

t.perRequest now.Sub(t.last)

perRequest是每次允诺需要的天数片,now.Sub(t.last)是此次允诺距离那次允诺的天数差,喔,到这儿他们称得上明白Uber是是不是同时实现漏桶的了,我知道每次允诺需要的天数,我根据你那次允诺距离此次的天数差,如果到了天数,我就返回,如果没到,我就让你补上天数,是不是补,我让你睡一会。

注意:更新t.sleepFor的这时候,不是直接=,而是+=,为什么?因为以后t.sleepFor本身可能为负数,比如那次的允诺等了很长天数了,使得当时的sleepFor为很大的负数,此次+=之后还为负数,他们继续直接返回嘛,这样符合他们流出的速率要固定的道理。

5.接下来有两个判断

if t.sleepFor < t.maxSlack { t.sleepFor = t.maxSlack }

注意,maxSlack用到了吧,sleepFor比maxSlack还小的这时候,他们就让sleepFor赋值为maxSlack,他们回头看下maxSlack等于什么?

maxSlack: 10 * time.Second / time.Duration(rate),

又明白了, 它的意思是sleepFor不能太负,你只能负到-10*一场允诺的天数片,为什么呢?你想想,如果我某一场允诺距离那次允诺隔得天数特别长,此时

t.perRequest now.Sub(t.last)

会负的特别大,那么后面如果突然有激增的流量过来,t.sleepFor +=之后还是负数,直接放过了流量,根本达不到开闭的作用,所以,他们把它控制在了-10倍,这个-10应该可以调节,目的是使得他们总的水的流速达到他们的要求,又不能有激增流量过来扛不住。

6.接下来就好理解了,sleepFor为正代表他们要睡会,该睡多长天数睡多长天数。

然后更新last,睡完了清理下sleepFor为0,如果小于0就代表不需要睡,更新下last

7.看起来clock是time.Time包了一层而已。

7.最后返回当前的天数。

最后,Uber给出了两个测试的用例

func TestExampleRatelimit(t *testing.T) { rl := New(100) // per second prev := time.Now() for i := 0; i < 10; i++ { now := rl.Take() if i > 0 { fmt.Println(i, now.Sub(prev)) } prev = now } // Output: // 1 10ms // 2 10ms // 3 10ms // 4 10ms // 5 10ms // 6 10ms // 7 10ms // 8 10ms // 9 10ms }

很单纯,建立两个100QPS的开闭器,然后每次初始化Take,发现每次允诺的间隔都是10ms。

副本桶

漏桶有两个不好的地方,是不能处置那种激增的流量,因为流水的速率是固定的,即使资源够用的情况下,处置速率也不会变。于是就有了副本桶,副本桶其实跟他们第一种单纯开闭的路子一样,每次允诺的这时候都会往里面放副本,然后再从桶里取副本,桶有一定的耗电量限制,当桶里没副本的这时候则产生开闭。看标识符

// TokenBucket 副本桶 type TokenBucket struct { lastModifiedTime int64 // 那次修改天数 storedTokens uint64 // 桶中存储的副本数 count uint64 // 每inter天数内产生count个副本 inter int64 // 产生count个副本的天数 maxTokens uint64 // 最小的副本数 sync.RWMutex }

他们来看下怎样建立两个开闭器的

func New(count uint64, inter time.Duration, maxTokens uint64, tokensNow uint64, startTime time.Time) *TokenBucket { return &TokenBucket{ count: count, inter: inter.Nanoseconds(), maxTokens: maxTokens, storedTokens: tokensNow, lastModifiedTime: startTime.UnixNano(), } }

解释下:

每隔inter天数产生count个副本,桶内最多存储maxTokens个副本,初始状态从startTime开始,桶内有tokensNow个副本。

他们来看下源码中给出的两个测试用例,他们随着测试用例看

func TestTokenBucketReserve(t *testing.T) { startTime := time.Now() bucket := New(3, time.Second, 5, 0, startTime) if bucket.ReserveWithTime(1, startTime) { t.Fatalf(“there should be 0 token at %s”, startTime.String()) } t1 := startTime.Add(time.Second) if !bucket.ReserveWithTime(3, t1) { t.Fatalf(“there should be 3 tokens after 1 second”) } // 此时已经没了 if bucket.ReserveWithTime(1, t1) { t.Fatal(“there should be 0 tokens after reserving 3 tokens”) } // 2秒后有5个token t1 = t1.Add(2 * time.Second) if !bucket.ReserveWithTime(5, t1) { t.Fatalf(“there should be 5 tokens after 2 seconds”) } // 此时已经没了 if bucket.ReserveWithTime(1, t1) { t.Fatal(“there should be 0 tokens after reserving all tokens”) } }

分析下:

1.首先建立两个副本桶,每秒钟产生3个,最多可存5个,在startTime时桶内只有0个token,startTime是当前天数。

func (b *TokenBucket) ReserveWithTime(count uint64, now time.Time) bool { if count <= 0 { return true } b.Lock() b.sync(now.UnixNano()) storedTokens := b.storedTokens if storedTokens < count { b.Unlock() return false } b.storedTokens -= count b.Unlock() return true }

2).同样对开闭器的操作也是加锁的

3).初始化sync函数,把now传进去,他们看下在干嘛

func (b *TokenBucket) sync(nowNano int64) { diff := nowNano b.lastModifiedTime if diff < 0 { return } tokensToPut := uint64(diff/b.inter) * b.count if tokensToPut < 1 { return } if sum, e := b.checkedAddUint64(b.storedTokens, tokensToPut); e == nil { if sum > b.maxTokens { sum = b.maxTokens } b.storedTokens = sum } else { return } b.lastModifiedTime = nowNano return }

首先diff即为此次允诺距离那次允诺的天数,判断了两个小于0,会出现这种情况吗?其实是会的,比如当允诺很多的这时候,很多允诺与由于锁的其原因,被序列化在了锁等待队列里,当锁释放时,当时传进去的time必定比当前天数要小,所以这也恰恰表明了,哪个协程先抢着锁了,哪个协程放副本。接着计算了下需要放入桶中的副本数,diff/b.inter等于diff有多少个周期,count是两个周期放多少个副本。如果不够两个副本则直接返回,注意这儿都没更新lastModifiedTime,因为此次虽然没够两个副本,下次可能够呢,要攒着。接下来初始化了checkedAddUint64,看下这个函数在干什么

// sum = a + b,如果溢出,err不为nil。 func (b *TokenBucket) checkedAddUint64(n1, n2 uint64) (sum uint64, err error) { sum = n1 + n2 if !(((n1 ^ n2) < 0) || ((n1 ^ sum) >= 0)) { err = ErrOverflow } return }

函数的作用是更新副本总数因此检查了下总数是否溢出,如果溢出则返回错误,我认为这儿有错误,n1, n2,sum 都是uint64的整数,

if !(((n1 ^ n2) < 0) || ((n1 ^ sum) >= 0))

永远是false,我认为应该是写成

if sum > n2

比较好,如果溢出了肯定是会小于n2的。这是我个人,如果你有不同意见,可以一起探讨下。

ok,他们继续上面的标识符,更新完总的副本数后,要检测下是否少于最小副本数了,如果少于了则改为最小副本数,这个好理解。最后把lastModifiedTime更新为当前天数。

5).sync简而言之同步的意思是更新了lastModifiedTime,因此放入了一定数目的副本。接下来就单纯了,读取 b.storedTok

基本上副本桶的主要就标识符他们看完了,他们看下其他几个辅助的函数

func (b *TokenBucket) SetRate(count uint64, inter time.Duration) { b.Lock() // 先将状态更新到当前天数,然后在设置速率 b.sync(time.Now().UnixNano()) b.count = count b.inter = inter.Nanoseconds() b.Unlock() }

函数很单纯,更新速率,速率相关的变量是count和inter,因此注意把更改天数更新到了当前天数。

func (b *TokenBucket) SetMaxTokens(max uint64) { b.Lock() // 先将状态更新到当前天数,然后在设置速率 b.sync(time.Now().UnixNano()) b.maxTokens = max b.Unlock() return }

这个函数是设置桶的最小耗电量

func (b *TokenBucket) GetStoredTokensNow() (tokens uint64) { b.Lock() b.sync(time.Now().UnixNano()) tokens = b.storedTokens b.Unlock() return }

归纳下,副本桶相比于漏桶有两个好处是,当桶内有足够的副本时,突然激增了流量,他们可以很快的把桶内的副本用完去处置允诺,而漏桶则不是,不管你流量多高,我就限定了处置的速率。

一点小想法

可不可以优化下漏桶,桶的价值观其实是生活中的写照,对于漏桶,我想到的是在桶的底部挖两个小洞,往外流水。但是实际上,洞往外流水的速率是跟桶中水的高度相关的(物理上的压强嘛),如果他们优化下漏桶,往外流水的速率跟桶中水的高度成正比,水越多,我相应的流快一些。当然这只是两个很不成熟的价值观,还没想到具体的同时实现技术细节,以及用到真实的工程中是否有效,记录下来以后细想。

后记

技术的成长总是点点滴滴的,希望我能一直保持对技术的热爱。

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务