Skip to main content
  1. internet/

限流器

·2892 words·6 mins·

限流算法 #

目前流行的限流算法有五种:

  • 令牌桶
  • 漏桶
  • 固定窗口
  • 滑动窗口
  • 滑动日志

固定窗口 #

原理 #

固定窗口是最简单的限流算法,比如我们规定每分钟限制访问数量为10,那么可以以每分钟为单位来记录请求数。

缺点 #

固定窗口是存在BUG的,我们将时间拆分的更细:

对于10分钟和11分钟,访问数量都没有超过10,但是在10:30~11:30这一分钟内访问数量超过了10.

滑动窗口 #

固定窗口的缺点比较明显,为了弥补这一点,可以使用滑动窗口。滑动窗口的实现有多种:

  • 将窗口拆的更小
  • 基于概率的流量统计
  • TCP中的滑动窗口

将窗口拆的更小 #

原理 #

为了解决固定窗口的BUG,我们可以将窗口拆的更小:从一分钟一个窗口改为一秒一个窗口。

在11:30进行请求时,会发现10:30~11:30发生的请求数已经达到了10个,因此会拒绝这个请求。

这种限流算法看上去就像是一个窗口在往前滑动,因此称为滑动窗口。

实现 #

一个简单的例子 #
  1. 构建一个60长度的数组,每个元素代表每秒的请求访问量
  2. 每次有请求访问时,判断60个元素内的访问量总和是否达到了上限
  3. 如果没达到上限,对应元素的请求访问量++
  4. 以上是一个简单的例子,也存在很多bug,但是能说明滑动窗口的基本原理
// example:
//	 sw := slidingWindow()
//		for i := 0; i < 11; i++ {
//			if sw() {
//				fmt.Println("handled")
//			} else {
//				fmt.Println("forbidden")
//			}
//		}
//
// output:
// handled
// handled
// handled
// handled
// handled
// handled
// handled
// handled
// handled
// handled
// forbidden
func slidingWindow() func() bool {
	window := [60]int{}
	lastSec := time.Now().Second()
	return func() bool {
		sum := 0
		for _, v := range window {
			sum += v
		}
		if sum >= 10 {
			return false
		}
		sec := time.Now().Second()
		idx := sec - lastSec
		window[idx]++
		return true
	}
}

上边是一个非常简单的例子,有很多‘”bug“,但是实现了滑动窗口的基本功能

缺点 #

滑动窗口本质上就是将固定窗口的窗口大小变小,因此由窗口存在导致的数据突变仍未解决。

基于概率的流量统计 #

在固定窗口的基础上,通过概率来统计流量,如下图:

  1. 上一分钟请求数量为5
  2. 当前分钟请求数量为3
  3. 当前秒数为18
  4. 当前窗口占当前分钟为30%
  5. 当前窗口占上一分钟为70%
  6. 当前窗口流量为5*70%+3 = 6.5

缺点 #

基于概率的流量统计只能缓解固定窗口的焦虑,但是仍不能彻底解决数据突变的问题。

TCP中的滑动窗口 #

不管是固定窗口还是上边的滑动窗口,存在的问题都是窗口中的统计数可能非常大,导致边界值异常。既然窗口中的统计数不稳定会造成问题,那么我们就将统计数固定为1好了。

[图片来源于Carson的博客](TCP Send Window, Receive Window, and How it Works | by Carson | Medium)

上图是TCP中的发送端的滑动窗口,每个小窗口代表一个字节,一个个窗口组成了队列。在这个队列中,使用指针来将队列分割成几种状态,图中绿色的部分就代表可以发送的窗口大小。

如果想更深入的了解TCP中的滑动窗口,可以看这篇Carson的博客

滑动日志 #

滑动日志需要记录每个请求以及请求时间,这样就能够精准计算出”当前窗口“内的请求数量。

但是这个方法太耗内存,因此很少使用。

漏桶 #

原理 #

  1. 将请求放到漏桶中
  2. 如果漏桶已满,则丢弃请求(如返回429状态码)
  3. 设置一个消费者以固定的速率从漏桶中获取请求

leak_bucket.png

实现 #

可以使用golang的channel实现一个简单的漏桶:

// 返回获取令牌的方法,如果在1s内能够获取到返回true,否则返回false
// example:
// func main() {
// 	putReq := leakBucket()
// 
// 	wg := sync.WaitGroup{}
// 	wg.Add(15)
// 	for i := 0; i < 15; i++ {
// 		go func(i int) {
// 			defer wg.Done()
// 			if putReq(i) {
// 				fmt.Println(time.Now().UnixMilli(), "queued")
// 			} else {
// 				fmt.Println(time.Now().UnixMilli(), "forbidden")
// 			}
// 		}(i)
// 	}
// 	wg.Wait()
// 	time.Sleep(time.Second * 3)
// }
//
// output:
// 1692868330765 queued
// 1692868330765 queued
// 1692868330765 queued
// 1692868330765 queued
// 1692868330765 queued
// 1692868330765 queued
// 1692868330765 queued
// 1692868330765 queued
// 1692868330765 queued
// 1692868330765 queued
// 1692868330765 queued
// 1692868330765 handle req 14
// 1692868330776 forbidden
// 1692868330776 forbidden
// 1692868330776 forbidden
// 1692868330776 forbidden
// 1692868330865 handle req 4
// 1692868330966 handle req 3
// 1692868331066 handle req 5
// 1692868331167 handle req 1
// 1692868331268 handle req 6
// 1692868331369 handle req 7
// 1692868331470 handle req 8
// 1692868331571 handle req 11
// 1692868331672 handle req 12
// 1692868331773 handle req 13

type req any

func leakBucket() func(r req) bool {
	const bucketCap = 10
	leakBucket := make(chan req, bucketCap)

	go func() {
		// 每100毫秒钟消费一个请求
		for req := range leakBucket {
			handleReq(req)
			time.Sleep(time.Millisecond * 100)
		}
	}()

	// 注入请求,如果在10ms内注入返回true,未能注入返回false
	putReq := func(r req) bool {
		timeout := time.After(10 * time.Millisecond)
		select {
		case <-timeout:
			return false
		case leakBucket <- r:
			return true
		}
	}
	return putReq
}

func handleReq(req req) {
	fmt.Println(time.Now().UnixMilli(), "handle req", req)
}

限制 #

漏桶的缺点在于请求被异步处理了,因此可以用于处理事件,但不适合处理用户的API请求。

令牌桶 #

原理 #

  1. 有一个组件固定向令牌桶中以一个固定速率注入令牌
  2. 如果令牌桶已经满了,则丢弃令牌
  3. 每个请求都需要从令牌中中拿走一个令牌
  4. 获取不到令牌的请求会被拒绝访问

实现 #

基于channel #

用golang中的channel能够很方便的实现令牌桶:

// 返回获取令牌的方法,如果在1s内能够获取到返回true,否则返回false
// example:
//
//	func main() {
//		getToken := getTokenFn()
//
//		wg := sync.WaitGroup{}
//		wg.Add(15)
//		for i := 0; i < 15; i++ {
//			go func() {
//				defer wg.Done()
//				if !getToken() {
//					fmt.Println("not get token")
//					return
//				}
//				fmt.Println("get token")
//			 }()
//		  }
//		  wg.Wait()
//	 }
//
// output:
// get token
// get token
// get token
// get token
// get token
// get token
// get token
// get token
// get token
// get token
// not get token
// not get token
// not get token
// not get token
// not get token
func getTokenFn() func() bool {
	type token struct{}
	const bucketCap = 100
	tokenBucket := make(chan token, bucketCap)

	go func() {
		for {
			// 每100毫秒钟注入一个令牌,如果令牌桶已满,则丢弃令牌
			if len(tokenBucket) < bucketCap {
				tokenBucket <- token{}
			}
			time.Sleep(time.Millisecond * 100)
		}
	}()

	// 获取bucket,如果在1s内获取到返回true,获取不到返回false
	getToken := func() bool {
		timeout := time.After(time.Second)
		select {
		case <-timeout:
			return false
		case <-tokenBucket:
			return true
		}
	}
	return getToken
}

这个实现非常简单,如果想要更丰富的功能,可以使用golang.org/x/time/rate库.

golang.org/x/time/rate #

golang.org/x/time/rate库支持更丰富的功能,比如:

  1. 支持一次拿多个token
  2. 支持设置每次拿到的最多的token数
  3. 可以获取当前令牌桶中的token数
  4. 支持预取,当令牌桶中的token不够时等待,直到token数量足够
  5. 。。。

golang.org/x/time/rate并没有使用channel,而是基于锁+计数的方式实现。

基于redis #

上述两个实现都是在内存中进行实现,生产环境中往往需要多个服务共用一个令牌桶,我们选择reids进行实现:

package limiter

import (
	"context"
	"fmt"
	"github.com/go-redis/redis"
	xRate "golang.org/x/time/rate"
	"strconv"
	"sync"
	"sync/atomic"
	"time"
)

const (
	tokenFormat     = "{%s}.tokens"
	timestampFormat = "{%s}.ts"
	pingInterval    = time.Millisecond * 100
)

var script = redis.NewScript(`local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local fill_time = capacity/rate
local ttl = math.floor(fill_time*2)
local last_tokens = tonumber(redis.call("get", KEYS[1]))
if last_tokens == nil then
    last_tokens = capacity
end
local last_refreshed = tonumber(redis.call("get", KEYS[2]))
if last_refreshed == nil then
    last_refreshed = 0
end
local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
if allowed then
    new_tokens = filled_tokens - requested
end
redis.call("setex", KEYS[1], ttl, new_tokens)
redis.call("setex", KEYS[2], ttl, now)
return allowed`)

type TokenLimiter struct {
	rate           int            // 每秒生产速率
	burst          int            // 桶容量
	client         *redis.Client  // 存储容器
	tokenKey       string         // redis key
	timestampKey   string         // 桶刷新时间key
	rescueLock     sync.Mutex     // lock
	redisAlive     uint32         // redis健康标识
	monitorStarted bool           // redis监控探测任务标识
	rescueLimiter  *xRate.Limiter // redis故障时采用进程内 令牌桶限流器
}

func NewTokenLimiter(rate, burst int, client *redis.Client, key string) *TokenLimiter {
	tokenKey := fmt.Sprintf(tokenFormat, key)
	timestampKey := fmt.Sprintf(timestampFormat, key)

	return &TokenLimiter{
		rate:          rate,
		burst:         burst,
		client:        client,
		tokenKey:      tokenKey,
		timestampKey:  timestampKey,
		redisAlive:    1,
		rescueLimiter: xRate.NewLimiter(xRate.Every(time.Second/time.Duration(rate)), burst),
	}
}

// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *TokenLimiter) Allow() bool {
	return lim.AllowN(time.Now(), 1)
}

// AllowCtx is shorthand for AllowNCtx(ctx,time.Now(), 1) with incoming context.
func (lim *TokenLimiter) AllowCtx(ctx context.Context) bool {
	return lim.AllowNCtx(ctx, time.Now(), 1)
}

// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate.
// Otherwise, use Reserve or Wait.
func (lim *TokenLimiter) AllowN(now time.Time, n int) bool {
	return lim.reserveN(context.Background(), now, n)
}

// AllowNCtx reports whether n events may happen at time now with incoming context.
// Use this method if you intend to drop / skip events that exceed the rate.
// Otherwise, use Reserve or Wait.
func (lim *TokenLimiter) AllowNCtx(ctx context.Context, now time.Time, n int) bool {
	return lim.reserveN(ctx, now, n)
}

func (lim *TokenLimiter) reserveN(ctx context.Context, now time.Time, n int) bool {
	if atomic.LoadUint32(&lim.redisAlive) == 0 {
		return lim.rescueLimiter.AllowN(now, n)
	}

	resp, err := script.Run(lim.client,
		[]string{
			lim.tokenKey,
			lim.timestampKey,
		},
		[]string{
			strconv.Itoa(lim.rate),
			strconv.Itoa(lim.burst),
			strconv.FormatInt(now.Unix(), 10),
			strconv.Itoa(n),
		}).Result()
	// redis allowed == false
	// Lua boolean false -> r Nil bulk reply
	if err == redis.Nil {
		return false
	}
	if err == context.DeadlineExceeded || err == context.Canceled {
		return false
	}
	if err != nil {
		lim.startMonitor()
		return lim.rescueLimiter.AllowN(now, n)
	}

	code, ok := resp.(int64)
	if !ok {
		lim.startMonitor()
		return lim.rescueLimiter.AllowN(now, n)
	}

	// redis allowed == true
	// Lua boolean true -> r integer reply with value of 1
	return code == 1
}

func (lim *TokenLimiter) startMonitor() {
	lim.rescueLock.Lock()
	defer lim.rescueLock.Unlock()

	if lim.monitorStarted {
		return
	}

	lim.monitorStarted = true
	atomic.StoreUint32(&lim.redisAlive, 0)

	go lim.waitForRedis()
}

func (lim *TokenLimiter) waitForRedis() {
	ticker := time.NewTicker(pingInterval)
	defer func() {
		ticker.Stop()
		lim.rescueLock.Lock()
		lim.monitorStarted = false
		lim.rescueLock.Unlock()
	}()

	for range ticker.C {
		_, err := lim.client.Ping().Result()
		if err == nil {
			atomic.StoreUint32(&lim.redisAlive, 1)
			return
		}
	}
}

相关阅读 #