序言
开闭的其原因就不多说了,一句话,为的是避免服务项目被高QPS击垮。在听第四组撷取A服务项目的这时候,看见我们探讨开闭的思路,当中说到副本桶,又说到了漏桶,以后只介绍副本桶的价值观,漏桶没是不是介绍过,趁此次把开闭方式归纳下,因此把同时实现技术细节通通介绍,称得上两个小的自学。此次主要就说的开闭方式主要包括单纯开闭、漏桶、副本桶。
单纯开闭
说下他们开闭的路子,允诺的速率他们这儿主要是以每秒钟处置允诺数目QPS为依据,简而言之开闭是我的USB承若的QPS最低每秒钟处置1W条允诺,当允诺少于1W的这时候,他们要把低于1W的允诺给婉拒掉,只处置那当中1W条。他们先来看一看单纯的开闭怎样做呢?话不多说,看标识符
// RateLimiter 速率限制器
type RateLimiter struct {
rate uint64 //开闭的速率
allowance uint64 //桶余下的耗电量
max uint64 //桶最小耗电量
unit uint64 //一场允诺的模块天数片
lastCheck uint64 //那次初始化的天数点
}
看一看建立速率限制器的标识符
// New 建立RateLimiter示例
func New(rate int, per time.Duration) *RateLimiter {
nano := uint64(per)
if nano < 1 {
nano = uint64(time.Second)
}
if rate < 1 {
rate = 1
}
return &RateLimiter{
rate: uint64(rate),
allowance: uint64(rate) * nano,
max: uint64(rate) * nano,
unit: nano,
lastCheck: unixNano(),
}
}
分析下标识符,函数传进来的rate参数是限定处置的最小允诺速率(两个天数周期内),per指的是两个天数周期的长度,比如rate为100,per为2*time.Second 则代表,他们的最小速率是2秒钟处置100次允诺。nano是把传进来的per转成了int64的整数,看起来是精确到了纳秒级别。allowance初始化为rate*nano好理解,因为刚开始桶是空的。
ok,他们来看这个开闭器是怎样使用
func main() {
// rate limit: simple
rl := simpleratelimit.New(10, time.Second)
for i := 0; i < 100; i++ {
log.Printf(“limit result: %v\n”, rl.Limit())
}
}
很单纯,他们建立两个每秒钟钟最低10次的开闭器,然后接着初始化开闭器的Limit()函数,来判定此次允诺是否需要被限制,返回true则婉拒,返回false则处置。ok,他们来看下Limit函数
// Limit 判断是否少于限制
func (rl *RateLimiter) Limit() bool {
now := unixNano()
passed := now – atomic.SwapUint64(&rl.lastCheck, now)
rate := atomic.LoadUint64(&rl.rate)
current := atomic.AddUint64(&rl.allowance, passed*rate)
if max := atomic.LoadUint64(&rl.max); current > max {
atomic.AddUint64(&rl.allowance, max–current)
current = max
}
if current < rl.unit {
return true
}
// 没少于限额
atomic.AddUint64(&rl.allowance, –rl.unit)
return false
}
分析下过程:
1.首先now为当前天数点,passed为当前天数减去那次允诺的天数点,即为距离那次允诺过去天数,注意SwapUint64顺带更新了lastCheck为当前天数。
2.接着读取下速率rate
3.然后计算current=passed*rate,这儿很有意思,刚开始还没看懂,实际上你理解下,passed是距离那次的天数,rate是速率,相乘代表什么?等会解释。
4.往下看有两个if判断,先读取桶的最小耗电量,然后比较current与max的大小,当大于max的这时候,更新allowance,他们知道这这时候实际上current是等于更新allowance的,更新的方式
atomic.AddUint64(&rl.allowance, max–current)
这不是把allowance更新为max了吗?然后把current=max。
到这儿,基本已经明白了,上一步的
current := atomic.AddUint64(&rl.allowance, passed*rate)
实际上在干什么,其实是在往桶里放水的过程,这个水你可以认为是一段天数(实际天数乘以速率rate),current是放进去这段天数之后的当前桶的耗电量。而这一步是为的是判断放进去的水是否少于桶的最小耗电量,当少于了,则更正为max。
5.接下来,判断当前耗电量current是否低于rl.unit,rl.unit是什么,不是一场允诺需要的天数吗?如果当前耗电量根本不够一场的允诺,必然需要给开闭的,所以返回true
6.如果current够一场允诺,则更显当前的配额,自然是减去一场允诺的天数。
最后放出余下的小部分标识符
// UpdateRate 更新速率值func (rl *RateLimiter) UpdateRate(rate int) {
atomic.StoreUint64(&rl.rate, uint64(rate))
atomic.StoreUint64(&rl.max, uint64(rate)*rl.unit)
}
// Undo 重置上一场初始化Limit(),返回没使用过的限额
func (rl *RateLimiter) Undo() {
current := atomic.AddUint64(&rl.allowance, rl.unit)
if max := atomic.LoadUint64(&rl.max); current > max {
atomic.AddUint64(&rl.allowance, max–current)
}
}
// unixNano 当前天数(纳秒)
func unixNano() uint64 {
return uint64(time.Now().UnixNano())
}
UpdateRate当然是更新rate了,不多说。Undo比较有意思了,把最近那次初始化使用的水又给放回去了,什么情况下会用到呢?假设你此次允诺发现使用方初始化的参数出错,实际上不占用你的下游QPS,你可以再把用的水放回去嘛。
接下来,他们思考下:
1.实际上这个同时实现方式的哲学价值观是什么呢?我每次桶里放水是允诺触发的,并不是他们异步的往桶里防水,会不会放水不及时呢?不会的,因为他们是先往桶里放水再判断是否开闭的。
2.为什么要把max设置为他们的rate*nano?他们不能把桶的最小耗电量设置的更大些吗?假设我把桶设置为100*nano会出现什么问题呢?
当桶初始化的这时候,桶的耗电量可以允许100次的允诺,假设瞬间过来了100个允诺,根据标识符是不会限额的,好吧看来rate主要是在限制桶的最小耗电量来达到速率的开闭。
3.你看Limit里面对开闭器的操作并没加锁,实际应用中他们的允诺必然是高并发的,这种情况会出现问题吗?
我认为有问题,比如他们再放入水更新allowance的这时候,
if max := atomic.LoadUint64(&rl.max); current > max {
atomic.AddUint64(&rl.allowance, max–current)
current = max
}
如果不做加锁,必然会可能造成两个协程先后判定if为true,接着先后都进入更新allowance,而两个都减去max-current会使得桶的耗电量小于max。
所以最好对桶的这块的操作加上锁,另外Undo的更新allowance同样需要,有必要对整个函数开始加锁,函数结束解锁吗?我觉得不必要,技术细节读者可以自行思考。
漏桶
漏桶又是什么价值观呢?单纯来说一句话,是你入水(允诺的速率)的速率无简而言之多大,我桶漏出水(处置)的速率是固定的,相信我们在网上也看见过很多漏桶的原理图,不再啰嗦,他们直接上标识符,以下标识符是Uber的开源库。
首先定义两个Limiter的USB
type Limiter interface {
// Take should block to make sure that the RPS is met. Take() time.Time
}
还有两个clock的USB,先放出来,稍后解释
type Clock interface {
Now() time.Time
Sleep(time.Duration)
}
然后他们给出它的开闭器的结构体
type limiter struct {
sync.Mutex
last time.Time //那次允诺的天数
sleepFor time.Duration //此次要允诺要sleep的天数 perRequest time.Duration //根据限定的速率得出的每次允诺分得的天数片
maxSlack time.Duration //稍后解释
clock Clock //时钟(稍后解释)
}
ok,他们来看下建立两个开闭器的函数
func New(rate int, opts …Option) Limiter {
l := &limiter{
perRequest: time.Second / time.Duration(rate),
maxSlack: –10 * time.Second / time.Duration(rate),
}
for _, opt := range opts {
opt(l)
}
if l.clock == nil {
l.clock = clock.New()
}
return l
}
分析下过程:
1.首先他们的perRequest等于1秒钟所包含的纳秒数/他们设定的速率rate,这个好理解
2.maxSlack为-10乘以perRequest,这是为什么呢?他们稍后说。
3.接着,函数传进来了两个Option类型的数组,貌似Option还是函数类型,他们循环初始化这些函数。
type Option func(l *limiter)
嗯,这些函数应该是给开闭器进行配置的。
4.然后建立了两个clock对象,他们稍后看下这个clock的用处是什么。
接下来,他们看一看它的Take函数
func (t *limiter) Take() time.Time {
t.Lock()
defer t.Unlock()
now := t.clock.Now()
if t.last.IsZero() {
t.last = now
return t.last
}
t.sleepFor += t.perRequest – now.Sub(t.last)
if t.sleepFor < t.maxSlack {
t.sleepFor = t.maxSlack
}
if t.sleepFor > 0 {
t.clock.Sleep(t.sleepFor)
t.last = now.Add(t.sleepFor)
t.sleepFor = 0
} else {
t.last = now
}
return t.last
}
分析下:
1.它加锁了,看起来符合他们的预期
2.首先读取t.clock.Now,ok他们还是来看一看这个clock到底是什么玩意
type clock struct{}
func New() Clock {
return &clock{}
}
func (c *clock) After(d time.Duration) <-chan time.Time { return time.After(d) }
func (c *clock) AfterFunc(d time.Duration, f func()) {
// TODO maybe return timer interface time.AfterFunc(d, f)
}
func (c *clock) Now() time.Time { return time.Now() }
func (c *clock) Sleep(d time.Duration) { time.Sleep(d) }
t.clock.Now不是time.Now()嘛,当前天数点,很好奇干嘛要包一层
3.接下来有两个if判断,t.last.IsZero() 如果为true的话,则把更新t.last为now,然后直接返回t.last,他们先看下IsZero函数
func (t Time) IsZero() bool {
return t.sec == 0 && t.nsec == 0
}
sec和nsec,这个time的天数点在公元第1年的00:00:00啊,注意看上面初始化他们并没初始化last,last是time.Time类型,默认初始化为0嘛,可不就IsZero()为true,为的是不放心,我写了个小测试标识符验证了下,当使用默认初始化time.Time的这时候,IsZero()确实为true。
同时,这也符合Uber官方给这段标识符的解释,说当第一场允诺的这时候直接返回。
4.接着更新了两个sleepFor,看起来好像是他们要睡多长天数的意思,看下它的计算方式
t.perRequest – now.Sub(t.last)
perRequest是每次允诺需要的天数片,now.Sub(t.last)是此次允诺距离那次允诺的天数差,喔,到这儿他们称得上明白Uber是是不是同时实现漏桶的了,我知道每次允诺需要的天数,我根据你那次允诺距离此次的天数差,如果到了天数,我就返回,如果没到,我就让你补上天数,是不是补,我让你睡一会。
注意:更新t.sleepFor的这时候,不是直接=,而是+=,为什么?因为以后t.sleepFor本身可能为负数,比如那次的允诺等了很长天数了,使得当时的sleepFor为很大的负数,此次+=之后还为负数,他们继续直接返回嘛,这样符合他们流出的速率要固定的道理。
5.接下来有两个判断
if t.sleepFor < t.maxSlack {
t.sleepFor = t.maxSlack
}
注意,maxSlack用到了吧,sleepFor比maxSlack还小的这时候,他们就让sleepFor赋值为maxSlack,他们回头看下maxSlack等于什么?
maxSlack: –10 * time.Second / time.Duration(rate),
又明白了, 它的意思是sleepFor不能太负,你只能负到-10*一场允诺的天数片,为什么呢?你想想,如果我某一场允诺距离那次允诺隔得天数特别长,此时
t.perRequest – now.Sub(t.last)
会负的特别大,那么后面如果突然有激增的流量过来,t.sleepFor +=之后还是负数,直接放过了流量,根本达不到开闭的作用,所以,他们把它控制在了-10倍,这个-10应该可以调节,目的是使得他们总的水的流速达到他们的要求,又不能有激增流量过来扛不住。
6.接下来就好理解了,sleepFor为正代表他们要睡会,该睡多长天数睡多长天数。
然后更新last,睡完了清理下sleepFor为0,如果小于0就代表不需要睡,更新下last
7.看起来clock是time.Time包了一层而已。
7.最后返回当前的天数。
最后,Uber给出了两个测试的用例
func TestExampleRatelimit(t *testing.T) {
rl := New(100) // per second
prev := time.Now()
for i := 0; i < 10; i++ {
now := rl.Take()
if i > 0 {
fmt.Println(i, now.Sub(prev))
}
prev = now
}
// Output:
// 1 10ms
// 2 10ms
// 3 10ms // 4 10ms
// 5 10ms
// 6 10ms
// 7 10ms
// 8 10ms
// 9 10ms
}
很单纯,建立两个100QPS的开闭器,然后每次初始化Take,发现每次允诺的间隔都是10ms。
副本桶
漏桶有两个不好的地方,是不能处置那种激增的流量,因为流水的速率是固定的,即使资源够用的情况下,处置速率也不会变。于是就有了副本桶,副本桶其实跟他们第一种单纯开闭的路子一样,每次允诺的这时候都会往里面放副本,然后再从桶里取副本,桶有一定的耗电量限制,当桶里没副本的这时候则产生开闭。看标识符
// TokenBucket 副本桶
type TokenBucket struct {
lastModifiedTime int64 // 那次修改天数
storedTokens uint64 // 桶中存储的副本数
count uint64 // 每inter天数内产生count个副本
inter int64 // 产生count个副本的天数 maxTokens uint64 // 最小的副本数
sync.RWMutex
}
他们来看下怎样建立两个开闭器的
func New(count uint64, inter time.Duration, maxTokens uint64, tokensNow uint64, startTime time.Time) *TokenBucket {
return &TokenBucket{
count: count,
inter: inter.Nanoseconds(),
maxTokens: maxTokens,
storedTokens: tokensNow,
lastModifiedTime: startTime.UnixNano(),
}
}
解释下:
每隔inter天数产生count个副本,桶内最多存储maxTokens个副本,初始状态从startTime开始,桶内有tokensNow个副本。
他们来看下源码中给出的两个测试用例,他们随着测试用例看
func TestTokenBucketReserve(t *testing.T) {
startTime := time.Now()
bucket := New(3, time.Second, 5, 0, startTime)
if bucket.ReserveWithTime(1, startTime) {
t.Fatalf(“there should be 0 token at %s”, startTime.String())
}
t1 := startTime.Add(time.Second)
if !bucket.ReserveWithTime(3, t1) {
t.Fatalf(“there should be 3 tokens after 1 second”)
}
// 此时已经没了
if bucket.ReserveWithTime(1, t1) {
t.Fatal(“there should be 0 tokens after reserving 3 tokens”)
}
// 2秒后有5个token
t1 = t1.Add(2 * time.Second)
if !bucket.ReserveWithTime(5, t1) {
t.Fatalf(“there should be 5 tokens after 2 seconds”)
}
// 此时已经没了
if bucket.ReserveWithTime(1, t1) {
t.Fatal(“there should be 0 tokens after reserving all tokens”)
}
}
分析下:
1.首先建立两个副本桶,每秒钟产生3个,最多可存5个,在startTime时桶内只有0个token,startTime是当前天数。
func (b *TokenBucket) ReserveWithTime(count uint64, now time.Time) bool {
if count <= 0 {
return true
}
b.Lock()
b.sync(now.UnixNano())
storedTokens := b.storedTokens
if storedTokens < count {
b.Unlock()
return false
}
b.storedTokens -= count
b.Unlock()
return true
}
看
2).同样对开闭器的操作也是加锁的
3).初始化sync函数,把now传进去,他们看下在干嘛
func (b *TokenBucket) sync(nowNano int64) {
diff := nowNano – b.lastModifiedTime
if diff < 0 {
return
}
tokensToPut := uint64(diff/b.inter) * b.count
if tokensToPut < 1 {
return
}
if sum, e := b.checkedAddUint64(b.storedTokens, tokensToPut); e == nil {
if sum > b.maxTokens {
sum = b.maxTokens
}
b.storedTokens = sum
} else {
return
}
b.lastModifiedTime = nowNano
return
}
首先diff即为此次允诺距离那次允诺的天数,判断了两个小于0,会出现这种情况吗?其实是会的,比如当允诺很多的这时候,很多允诺与由于锁的其原因,被序列化在了锁等待队列里,当锁释放时,当时传进去的time必定比当前天数要小,所以这也恰恰表明了,哪个协程先抢着锁了,哪个协程放副本。接着计算了下需要放入桶中的副本数,diff/b.inter等于diff有多少个周期,count是两个周期放多少个副本。如果不够两个副本则直接返回,注意这儿都没更新lastModifiedTime,因为此次虽然没够两个副本,下次可能够呢,要攒着。接下来初始化了checkedAddUint64,看下这个函数在干什么
// sum = a + b,如果溢出,err不为nil。
func (b *TokenBucket) checkedAddUint64(n1, n2 uint64) (sum uint64, err error) {
sum = n1 + n2
if !(((n1 ^ n2) < 0) || ((n1 ^ sum) >= 0)) {
err = ErrOverflow
}
return
}
函数的作用是更新副本总数因此检查了下总数是否溢出,如果溢出则返回错误,我认为这儿有错误,n1, n2,sum 都是uint64的整数,
if !(((n1 ^ n2) < 0) || ((n1 ^ sum) >= 0))
永远是false,我认为应该是写成
if sum > n2
比较好,如果溢出了肯定是会小于n2的。这是我个人,如果你有不同意见,可以一起探讨下。
ok,他们继续上面的标识符,更新完总的副本数后,要检测下是否少于最小副本数了,如果少于了则改为最小副本数,这个好理解。最后把lastModifiedTime更新为当前天数。
5).sync简而言之同步的意思是更新了lastModifiedTime,因此放入了一定数目的副本。接下来就单纯了,读取 b.storedTok
基本上副本桶的主要就标识符他们看完了,他们看下其他几个辅助的函数
func (b *TokenBucket) SetRate(count uint64, inter time.Duration) {
b.Lock()
// 先将状态更新到当前天数,然后在设置速率
b.sync(time.Now().UnixNano())
b.count = count
b.inter = inter.Nanoseconds()
b.Unlock()
}
函数很单纯,更新速率,速率相关的变量是count和inter,因此注意把更改天数更新到了当前天数。
func (b *TokenBucket) SetMaxTokens(max uint64) {
b.Lock()
// 先将状态更新到当前天数,然后在设置速率
b.sync(time.Now().UnixNano())
b.maxTokens = max
b.Unlock()
return
}
这个函数是设置桶的最小耗电量
func (b *TokenBucket) GetStoredTokensNow() (tokens uint64) {
b.Lock()
b.sync(time.Now().UnixNano())
tokens = b.storedTokens
b.Unlock()
return
}
归纳下,副本桶相比于漏桶有两个好处是,当桶内有足够的副本时,突然激增了流量,他们可以很快的把桶内的副本用完去处置允诺,而漏桶则不是,不管你流量多高,我就限定了处置的速率。
一点小想法
可不可以优化下漏桶,桶的价值观其实是生活中的写照,对于漏桶,我想到的是在桶的底部挖两个小洞,往外流水。但是实际上,洞往外流水的速率是跟桶中水的高度相关的(物理上的压强嘛),如果他们优化下漏桶,往外流水的速率跟桶中水的高度成正比,水越多,我相应的流快一些。当然这只是两个很不成熟的价值观,还没想到具体的同时实现技术细节,以及用到真实的工程中是否有效,记录下来以后细想。
后记
技术的成长总是点点滴滴的,希望我能一直保持对技术的热爱。