限流器
Table of Contents
限流算法 #
目前流行的限流算法有五种:
- 令牌桶
- 漏桶
- 固定窗口
- 滑动窗口
- 滑动日志
固定窗口 #
原理 #
固定窗口是最简单的限流算法,比如我们规定每分钟限制访问数量为10,那么可以以每分钟为单位来记录请求数。
缺点 #
固定窗口是存在BUG的,我们将时间拆分的更细:
对于10分钟和11分钟,访问数量都没有超过10,但是在10:30~11:30这一分钟内访问数量超过了10.
滑动窗口 #
固定窗口的缺点比较明显,为了弥补这一点,可以使用滑动窗口。滑动窗口的实现有多种:
- 将窗口拆的更小
- 基于概率的流量统计
- TCP中的滑动窗口
将窗口拆的更小 #
原理 #
为了解决固定窗口的BUG,我们可以将窗口拆的更小:从一分钟一个窗口改为一秒一个窗口。
在11:30进行请求时,会发现10:30~11:30发生的请求数已经达到了10个,因此会拒绝这个请求。
这种限流算法看上去就像是一个窗口在往前滑动,因此称为滑动窗口。
实现 #
一个简单的例子 #
- 构建一个60长度的数组,每个元素代表每秒的请求访问量
- 每次有请求访问时,判断60个元素内的访问量总和是否达到了上限
- 如果没达到上限,对应元素的请求访问量++
- 以上是一个简单的例子,也存在很多bug,但是能说明滑动窗口的基本原理
// example:
// sw := slidingWindow()
// for i := 0; i < 11; i++ {
// if sw() {
// fmt.Println("handled")
// } else {
// fmt.Println("forbidden")
// }
// }
//
// output:
// handled
// handled
// handled
// handled
// handled
// handled
// handled
// handled
// handled
// handled
// forbidden
func slidingWindow() func() bool {
window := [60]int{}
lastSec := time.Now().Second()
return func() bool {
sum := 0
for _, v := range window {
sum += v
}
if sum >= 10 {
return false
}
sec := time.Now().Second()
idx := sec - lastSec
window[idx]++
return true
}
}
上边是一个非常简单的例子,有很多‘”bug“,但是实现了滑动窗口的基本功能
缺点 #
滑动窗口本质上就是将固定窗口的窗口大小变小,因此由窗口存在导致的数据突变仍未解决。
基于概率的流量统计 #
在固定窗口的基础上,通过概率来统计流量,如下图:
- 上一分钟请求数量为5
- 当前分钟请求数量为3
- 当前秒数为18
- 当前窗口占当前分钟为30%
- 当前窗口占上一分钟为70%
- 当前窗口流量为
5*70%+3 = 6.5
缺点 #
基于概率的流量统计只能缓解固定窗口的焦虑,但是仍不能彻底解决数据突变的问题。
TCP中的滑动窗口 #
不管是固定窗口还是上边的滑动窗口,存在的问题都是窗口中的统计数可能非常大,导致边界值异常。既然窗口中的统计数不稳定会造成问题,那么我们就将统计数固定为1好了。
[图片来源于Carson的博客](TCP Send Window, Receive Window, and How it Works | by Carson | Medium)
上图是TCP中的发送端的滑动窗口,每个小窗口代表一个字节,一个个窗口组成了队列。在这个队列中,使用指针来将队列分割成几种状态,图中绿色的部分就代表可以发送的窗口大小。
如果想更深入的了解TCP中的滑动窗口,可以看这篇Carson的博客。
滑动日志 #
滑动日志需要记录每个请求以及请求时间,这样就能够精准计算出”当前窗口“内的请求数量。
但是这个方法太耗内存,因此很少使用。
漏桶 #
原理 #
- 将请求放到漏桶中
- 如果漏桶已满,则丢弃请求(如返回429状态码)
- 设置一个消费者以固定的速率从漏桶中获取请求
实现 #
可以使用golang的channel实现一个简单的漏桶:
// 返回获取令牌的方法,如果在1s内能够获取到返回true,否则返回false
// example:
// func main() {
// putReq := leakBucket()
//
// wg := sync.WaitGroup{}
// wg.Add(15)
// for i := 0; i < 15; i++ {
// go func(i int) {
// defer wg.Done()
// if putReq(i) {
// fmt.Println(time.Now().UnixMilli(), "queued")
// } else {
// fmt.Println(time.Now().UnixMilli(), "forbidden")
// }
// }(i)
// }
// wg.Wait()
// time.Sleep(time.Second * 3)
// }
//
// output:
// 1692868330765 queued
// 1692868330765 queued
// 1692868330765 queued
// 1692868330765 queued
// 1692868330765 queued
// 1692868330765 queued
// 1692868330765 queued
// 1692868330765 queued
// 1692868330765 queued
// 1692868330765 queued
// 1692868330765 queued
// 1692868330765 handle req 14
// 1692868330776 forbidden
// 1692868330776 forbidden
// 1692868330776 forbidden
// 1692868330776 forbidden
// 1692868330865 handle req 4
// 1692868330966 handle req 3
// 1692868331066 handle req 5
// 1692868331167 handle req 1
// 1692868331268 handle req 6
// 1692868331369 handle req 7
// 1692868331470 handle req 8
// 1692868331571 handle req 11
// 1692868331672 handle req 12
// 1692868331773 handle req 13
type req any
func leakBucket() func(r req) bool {
const bucketCap = 10
leakBucket := make(chan req, bucketCap)
go func() {
// 每100毫秒钟消费一个请求
for req := range leakBucket {
handleReq(req)
time.Sleep(time.Millisecond * 100)
}
}()
// 注入请求,如果在10ms内注入返回true,未能注入返回false
putReq := func(r req) bool {
timeout := time.After(10 * time.Millisecond)
select {
case <-timeout:
return false
case leakBucket <- r:
return true
}
}
return putReq
}
func handleReq(req req) {
fmt.Println(time.Now().UnixMilli(), "handle req", req)
}
限制 #
漏桶的缺点在于请求被异步处理了,因此可以用于处理事件,但不适合处理用户的API请求。
令牌桶 #
原理 #
- 有一个组件固定向令牌桶中以一个固定速率注入令牌
- 如果令牌桶已经满了,则丢弃令牌
- 每个请求都需要从令牌中中拿走一个令牌
- 获取不到令牌的请求会被拒绝访问
实现 #
基于channel #
用golang中的channel能够很方便的实现令牌桶:
// 返回获取令牌的方法,如果在1s内能够获取到返回true,否则返回false
// example:
//
// func main() {
// getToken := getTokenFn()
//
// wg := sync.WaitGroup{}
// wg.Add(15)
// for i := 0; i < 15; i++ {
// go func() {
// defer wg.Done()
// if !getToken() {
// fmt.Println("not get token")
// return
// }
// fmt.Println("get token")
// }()
// }
// wg.Wait()
// }
//
// output:
// get token
// get token
// get token
// get token
// get token
// get token
// get token
// get token
// get token
// get token
// not get token
// not get token
// not get token
// not get token
// not get token
func getTokenFn() func() bool {
type token struct{}
const bucketCap = 100
tokenBucket := make(chan token, bucketCap)
go func() {
for {
// 每100毫秒钟注入一个令牌,如果令牌桶已满,则丢弃令牌
if len(tokenBucket) < bucketCap {
tokenBucket <- token{}
}
time.Sleep(time.Millisecond * 100)
}
}()
// 获取bucket,如果在1s内获取到返回true,获取不到返回false
getToken := func() bool {
timeout := time.After(time.Second)
select {
case <-timeout:
return false
case <-tokenBucket:
return true
}
}
return getToken
}
这个实现非常简单,如果想要更丰富的功能,可以使用golang.org/x/time/rate
库.
golang.org/x/time/rate #
golang.org/x/time/rate
库支持更丰富的功能,比如:
- 支持一次拿多个token
- 支持设置每次拿到的最多的token数
- 可以获取当前令牌桶中的token数
- 支持预取,当令牌桶中的token不够时等待,直到token数量足够
- 。。。
golang.org/x/time/rate
并没有使用channel,而是基于锁+计数的方式实现。
基于redis #
上述两个实现都是在内存中进行实现,生产环境中往往需要多个服务共用一个令牌桶,我们选择reids进行实现:
package limiter
import (
"context"
"fmt"
"github.com/go-redis/redis"
xRate "golang.org/x/time/rate"
"strconv"
"sync"
"sync/atomic"
"time"
)
const (
tokenFormat = "{%s}.tokens"
timestampFormat = "{%s}.ts"
pingInterval = time.Millisecond * 100
)
var script = redis.NewScript(`local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local fill_time = capacity/rate
local ttl = math.floor(fill_time*2)
local last_tokens = tonumber(redis.call("get", KEYS[1]))
if last_tokens == nil then
last_tokens = capacity
end
local last_refreshed = tonumber(redis.call("get", KEYS[2]))
if last_refreshed == nil then
last_refreshed = 0
end
local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
if allowed then
new_tokens = filled_tokens - requested
end
redis.call("setex", KEYS[1], ttl, new_tokens)
redis.call("setex", KEYS[2], ttl, now)
return allowed`)
type TokenLimiter struct {
rate int // 每秒生产速率
burst int // 桶容量
client *redis.Client // 存储容器
tokenKey string // redis key
timestampKey string // 桶刷新时间key
rescueLock sync.Mutex // lock
redisAlive uint32 // redis健康标识
monitorStarted bool // redis监控探测任务标识
rescueLimiter *xRate.Limiter // redis故障时采用进程内 令牌桶限流器
}
func NewTokenLimiter(rate, burst int, client *redis.Client, key string) *TokenLimiter {
tokenKey := fmt.Sprintf(tokenFormat, key)
timestampKey := fmt.Sprintf(timestampFormat, key)
return &TokenLimiter{
rate: rate,
burst: burst,
client: client,
tokenKey: tokenKey,
timestampKey: timestampKey,
redisAlive: 1,
rescueLimiter: xRate.NewLimiter(xRate.Every(time.Second/time.Duration(rate)), burst),
}
}
// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *TokenLimiter) Allow() bool {
return lim.AllowN(time.Now(), 1)
}
// AllowCtx is shorthand for AllowNCtx(ctx,time.Now(), 1) with incoming context.
func (lim *TokenLimiter) AllowCtx(ctx context.Context) bool {
return lim.AllowNCtx(ctx, time.Now(), 1)
}
// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate.
// Otherwise, use Reserve or Wait.
func (lim *TokenLimiter) AllowN(now time.Time, n int) bool {
return lim.reserveN(context.Background(), now, n)
}
// AllowNCtx reports whether n events may happen at time now with incoming context.
// Use this method if you intend to drop / skip events that exceed the rate.
// Otherwise, use Reserve or Wait.
func (lim *TokenLimiter) AllowNCtx(ctx context.Context, now time.Time, n int) bool {
return lim.reserveN(ctx, now, n)
}
func (lim *TokenLimiter) reserveN(ctx context.Context, now time.Time, n int) bool {
if atomic.LoadUint32(&lim.redisAlive) == 0 {
return lim.rescueLimiter.AllowN(now, n)
}
resp, err := script.Run(lim.client,
[]string{
lim.tokenKey,
lim.timestampKey,
},
[]string{
strconv.Itoa(lim.rate),
strconv.Itoa(lim.burst),
strconv.FormatInt(now.Unix(), 10),
strconv.Itoa(n),
}).Result()
// redis allowed == false
// Lua boolean false -> r Nil bulk reply
if err == redis.Nil {
return false
}
if err == context.DeadlineExceeded || err == context.Canceled {
return false
}
if err != nil {
lim.startMonitor()
return lim.rescueLimiter.AllowN(now, n)
}
code, ok := resp.(int64)
if !ok {
lim.startMonitor()
return lim.rescueLimiter.AllowN(now, n)
}
// redis allowed == true
// Lua boolean true -> r integer reply with value of 1
return code == 1
}
func (lim *TokenLimiter) startMonitor() {
lim.rescueLock.Lock()
defer lim.rescueLock.Unlock()
if lim.monitorStarted {
return
}
lim.monitorStarted = true
atomic.StoreUint32(&lim.redisAlive, 0)
go lim.waitForRedis()
}
func (lim *TokenLimiter) waitForRedis() {
ticker := time.NewTicker(pingInterval)
defer func() {
ticker.Stop()
lim.rescueLock.Lock()
lim.monitorStarted = false
lim.rescueLock.Unlock()
}()
for range ticker.C {
_, err := lim.client.Ping().Result()
if err == nil {
atomic.StoreUint32(&lim.redisAlive, 1)
return
}
}
}
相关阅读 #
- TCP Send Window, Receive Window, and How it Works | by Carson | Medium
- 五种限流算法,七种限流方式,挡住突发流量?-限流的几种方式 (51cto.com)
- 《System Design Interview - An Insider’s Guide》