golang 中基于 IP 地址的 HTTP 限流
目前,网络上垃圾留言机器人太猖獗,我这个小破站曾经就被疯狂的灌水,太难了。。。。
所以令牌桶这个需求也就随之诞生了。。。其实就是限制 同一IP
发送评论的频率,
恰好我这个小破站是用 golang 开发的,恰好 Golang 官方提供的扩展库里就自带了限流算法的实现,即 golang.org/x/time/rate
。该限流器也是基于 Token Bucket(令牌桶) 实现的。
time/rate
包的 Limiter
类型对限流器进行了定义,所有限流功能都是通过基于 Limiter
类型实现的,其内部结构如下
type Limiter struct {
mu sync.Mutex
limit Limit
burst int // 令牌通的大小
tokens float64
last time.Time // 上次更新tokens的时间
lastEvent time.Time // 上次发生限速事件的时间(通过或者限制都是限速器事件)
}
字段的作用:
- limit: limit 字段表示往桶里放 Token 的速率,它的类型是 Limit,是 int64 的类型别名。设置 limit 时既可以用数字指定每秒向桶中放多少个 Token,也可以指定向桶中放入 Token 的时间间隔,其实指定了每秒放 Token 的个数后就能计算出放每个 Token 的时间间隔了。
- burst: 令牌桶的大小。
- tokens:桶中的令牌。
- last:上次往桶中放 Token 的时间。
- lastEvent:上次发生限速器事件的时间(通过或者限制都是限速器事件)
- 可以看到在 timer/rate 的限流器实现中,并没有单独维护一个 Timer 和队列去真的每隔一段时间向桶中放令牌,而是仅仅通过计数的方式表示桶中剩余的令牌。每次消费取 Token 之前会先根据上次更新令牌数的时间差更新桶中 Token 数。
构造限流器构造一个限流器对象
limiter := rate.NewLimiter(10, 100)
这里有两个参数:
第一个参数是 r Limit,设置的是限流器 Limiter 的 limit 字段,代表每秒可以向 Token 桶中产生多少 token。Limit 实际上是 float64 的别名。
第二个参数是 b int,b 代表 Token 桶的容量大小,也就是设置的限流器 Limiter 的 burst 字段。
对于以上例子来说,其构造出的限流器的令牌桶大小为 100, 以每秒 10 个 Token 的速率向桶中放置 Token。
除了给r Limit参数直接指定每秒产生的 Token 个数外,还可以用 Every 方法来指定向桶中放置 Token 的间隔,例如:
limit := rate.Every(100 * time.Millisecond)
limiter := rate.NewLimiter(limit, 100)
以上就表示每 100ms 往桶中放一个 Token。本质上也是一秒钟往桶里放 10 个。
使用限流器
Limiter 提供了三类方法供程序消费 Token,可以每次消费一个 Token,也可以一次性消费多个 Token。每种方法代表了当 Token 不足时,各自不同的对应手段,可以阻塞等待桶中Token补充,也可以直接返回取Token失败。
Wait/WaitN
func (lim *Limiter) Wait(ctx context.Context) (err error)
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
Wait 实际上就是 WaitN(ctx, 1)
。
当使用 Wait 方法消费 Token 时,如果此时桶内 Token 数组不足 (小于 N),那么 Wait 方法将会阻塞一段时间,直至 Token 满足条件。如果充足则直接返回。
这里可以看到,Wait 方法有一个 context 参数。我们可以设置 context 的 Deadline 或者 Timeout,来决定此次 Wait 的最长时间。
// 一直等到获取到桶中的令牌
err := limiter.Wait(context.Background())
if err != nil {
fmt.Println("Error: ", err)
}
// 设置一秒的等待超时时间
ctx, _ := context.WithTimeout(context.Background(), time.Second*1)
err := limiter.Wait(ctx)
if err != nil {
fmt.Println("Error: ", err)
}
Allow/AllowN
func (lim *Limiter) Allow() bool
func (lim *Limiter) AllowN(now time.Time, n int) bool
Allow 实际上就是对 AllowN(time.Now(), 1)
进行简化的函数。
AllowN 方法表示,截止到某一时刻,目前桶中数目是否至少为 n 个,满足则返回 true,同时从桶中消费 n 个 token。反之不消费桶中的Token,返回false。
对应线上的使用场景是,如果请求速率超过限制,就直接丢弃超频后的请求。
if limiter.AllowN(time.Now(), 2) {
fmt.Println("event allowed")
} else {
fmt.Println("event not allowed")
}
Reserve/ReserveN
func (lim *Limiter) Reserve() *Reservation
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation
Reserve 相当于 ReserveN(time.Now(), 1)
。
ReserveN 的用法就相对来说复杂一些,当调用完成后,无论 Token 是否充足,都会返回一个 *Reservation 对象。你可以调用该对象的 Delay() 方法,该方法返回的参数类型为 time.Duration,反映了需要等待的时间,必须等到等待时间之后,才能进行接下来的工作。如果不想等待,可以调用 Cancel() 方法,该方法会将 Token 归还。
举一个简单的例子,我们可以这么使用 Reserve 方法。
r := limiter.Reserve()
f !r.OK() {
// Not allowed to act! Did you remember to set lim.burst to be > 0 ?
return
}
time.Sleep(r.Delay())
Act() // 执行相关逻辑
动态调整速率和桶大小
Limiter 支持创建后动态调整速率和桶大小:
SetLimit(Limit) 改变放入 Token 的速率
SetBurst(int) 改变 Token 桶大小
有了这两个方法,可以根据现有环境和条件以及我们的需求,动态地改变 Token 桶大小和速率。
总结
今天总结了 Golang 官方限流器的使用方法,它是一种令牌桶算实现的限流器。其中 Wait/WaitN,Allow/AllowN 这两组方法在平时用的比较多,前者是消费Token时如果桶中Token不足可以让程序等待桶中新Token的放入(最好设置上等待时长)后者则是在桶中的Token不足时选择直接丢弃请求。
除了Golang官方提供的限流器实现,Uber公司开源的限流器uber-go/ratelimit
也是一个很好的选择,与Golang官方限流器不同的是Uber的限流器是通过漏桶算法实现的,不过对传统的漏桶算法进行了改良。
好上面一大堆都是抄自 https://blog.51cto.com/niuben/3279635
下面写一个简单的小例子
package limiter
import (
"sync"
"golang.org/x/time/rate"
)
// IPRateLimiter .
type IPRateLimiter struct {
ips map[string]*rate.Limiter
mu *sync.RWMutex
r rate.Limit
b int
}
// NewIPRateLimiter .
func NewIPRateLimiter(r rate.Limit, b int) *IPRateLimiter {
i := &IPRateLimiter{
ips: make(map[string]*rate.Limiter),
mu: &sync.RWMutex{},
r: r,
b: b,
}
return i
}
// AddIP creates a new rate limiter and adds it to the ips map,
// using the IP address as the key
func (i *IPRateLimiter) AddIP(ip string) *rate.Limiter {
i.mu.Lock()
defer i.mu.Unlock()
limiter := rate.NewLimiter(i.r, i.b)
i.ips[ip] = limiter
return limiter
}
// GetLimiter returns the rate limiter for the provided IP address if it exists.
// Otherwise calls AddIP to add IP address to the map
func (i *IPRateLimiter) GetIpLimiter(ip string) *rate.Limiter {
i.mu.Lock()
limiter, exists := i.ips[ip]
if !exists {
i.mu.Unlock()
return i.AddIP(ip)
}
i.mu.Unlock()
return limiter
}
用法
ip := request.GetIp()
limit := limiter.NewIPRateLimiter(10, 100)
limiter := limiter.GetLimiter(ip)
if !limiter.Allow() {
http.Error(w, http.StatusText(http.StatusTooManyRequests), http.StatusTooManyRequests)
return
}