背景
最近在项目开发中涉及 AI 网关的限频调用,以及任务消息消费的动态限速调整,正好都用到了令牌桶,这里简要做下记录
rate 限流器包简介
golang.org/x/time/rate
包是 Go 语言中一个非常实用的限流器实现,它基于令牌桶算法。
这个包提供了一种简单而强大的方式来控制操作的速率,防止系统过载,或者在需要遵守特定速率限制(例如,调用第三方 API)时使用。
rate
包的核心是令牌桶算法。想象一个桶,里面装着“令牌”。
- 令牌生成速率 (Rate): 令牌以固定的速率(例如,每秒 10 个)被放入桶中。
- 桶容量 (Burst): 桶有一个最大容量。如果桶满了,新生成的令牌就会被丢弃。
- 请求消耗令牌: 当一个请求到来时,它会尝试从桶中取走一个令牌。
- 如果桶中有足够的令牌,请求立即被允许执行,并消耗一个令牌。
- 如果桶中没有足够的令牌,请求必须等待,直到有新的令牌生成并放入桶中。
这种机制确保了在短时间内可以处理突发请求(利用桶中已有的令牌),但长期来看,请求的平均速率不会超过令牌的生成速率。
rate 包方法简要说明
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| // - r (rate.Limit): 这是令牌的生成速率,表示每秒允许的事件数
// - b (int): 这是令牌桶的容量(burst),表示桶中最多可以存储多少个令牌。它决定了限流器可以处理的突发请求数量。
func NewLimiter(r Limit, b int) *Limiter
// 非阻塞。如果桶中有足够的令牌,它会消耗一个令牌并返回 true;否则,返回 false。
func (lim *Limiter) Allow() bool
func (lim *Limiter) AllowN(now time.Time, n int) bool
// 阻塞。如果桶中没有令牌,它会阻塞当前 goroutine,等待直到有令牌可用。
// ctx 参数:
// - 如果 ctx 被取消(例如,context.WithCancel 或 context.WithTimeout),Wait 会立即返回 ctx.Err()。
// - 如果 ctx 为 nil,则不会有超时或取消,Wait 会一直等待直到令牌可用。
func (lim *Limiter) Wait(ctx context.Context) error
func (lim *Limiter) WaitN(ctx context.Context, n int) error
// 这些方法用于预留令牌,但不会立即消耗。
func (lim *Limiter) Reserve() *Reservation
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation
// 动态调整速率
limiter.SetLimit(rate.Limit(20)) // 调整为每秒20个
limiter.SetBurst(10) // 调整容量为10
|
简单代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
| import (
"fmt"
"time"
"golang.org/x/time/rate"
)
func main() {
// 创建一个限流器:每秒允许10个事件,桶容量为5
// 这意味着:
// - 长期平均速率不会超过每秒10个请求。
// - 短时间内,可以处理最多5个突发请求(如果桶是满的)。
limiter := rate.NewLimiter(rate.Limit(10), 5)
fmt.Printf("Limiter created: Rate=%v, Burst=%v\n", limiter.Limit(), limiter.Burst())
// 尝试获取令牌
// Allow() 是非阻塞的,消耗一个token,立即返回是否允许
if limiter.Allow() {
fmt.Println("Request 1 allowed immediately.")
} else {
fmt.Println("Request 1 denied immediately.")
}
// Wait() 是阻塞的,直到获取到令牌
fmt.Println("Waiting for a token...")
err := limiter.Wait(nil) // nil context表示没有超时或取消
if err != nil {
fmt.Printf("Wait error: %v\n", err)
} else {
fmt.Println("Request 2 allowed after waiting (if necessary).")
}
}
|
使用令牌桶的常见场景
API 限速调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| // 限制对某个第三方API的调用,例如每秒100次
apiLimiter := rate.NewLimiter(rate.Limit(100), 100)
func callExternalAPI() {
if err := apiLimiter.Wait(context.Background()); err != nil {
fmt.Println("Failed to acquire token for API call:", err)
return
}
// 实际调用API的逻辑
fmt.Println("Calling external API...")
}
// 在goroutine中并发调用
for i := 0; i < 200; i++ {
go callExternalAPI()
}
|
防止服务过载(中间件,单节点限流)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| // 限制服务器处理请求的速率,例如每秒500个请求
requestLimiter := rate.NewLimiter(rate.Limit(500), 500)
func handleRequest(req interface{}) {
if !requestLimiter.Allow() {
// 请求过多,直接拒绝或返回错误
fmt.Println("Request denied: Too many requests.")
return
}
// 处理请求的逻辑
fmt.Println("Processing request...")
}
// 在HTTP服务器的Handler中
// http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
// if !requestLimiter.Allow() {
// http.Error(w, "Too Many Requests", http.StatusTooManyRequests)
// return
// }
// // ... 处理请求 ...
// })
|
控制后台任务执行速率
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| // 限制后台任务的执行速率,例如每分钟10个任务
taskLimiter := rate.NewLimiter(rate.Every(time.Minute/10), 1)
func processTask(taskID int) {
fmt.Printf("Processing task %d...\n", taskID)
time.Sleep(50 * time.Millisecond) // 模拟任务耗时
}
func main() {
fmt.Println("\n--- Background Task Limiting ---")
for i := 0; i < 20; i++ {
// 阻塞等待令牌,确保任务不会过快执行
if err := taskLimiter.Wait(context.Background()); err != nil {
fmt.Printf("Error waiting for task %d: %v\n", i, err)
continue
}
go processTask(i)
}
time.Sleep(2 * time.Minute) // 等待所有任务完成
}
|
注意事项和最佳实践
- 选择合适的速率和容量:
- 速率 (Rate): 根据你的业务需求和外部服务的限制来设定。
- 容量 (Burst): 决定了你的系统能承受多大的瞬时流量冲击。容量越大,能处理的突发请求越多,但也会导致在突发情况下消耗更多资源。通常,容量应该至少等于速率,或者略大于速率,以应对短时间的流量高峰。
- 如果
burst
设置为 0
,则表示没有突发能力,每次请求都必须严格按照速率来。但通常不建议设为 0,因为这会使得限流器过于严格,无法处理任何瞬时波动。
context.Context
的使用:- 在使用
Wait()
或 WaitN()
时,始终传入一个带有超时或取消功能的 context.Context
。这可以防止 goroutine 无限期阻塞,提高程序的健壮性。 - 对于服务器请求,通常使用请求的
context
。
- 并发安全:
rate.Limiter
是并发安全的,可以在多个 goroutine 中安全地使用同一个 Limiter
实例。 - 性能考虑:
rate.Limiter
的实现是高效的,基于原子操作和时间计算,不会引入显著的性能开销。 - 动态调整速率:
rate.Limiter
提供了 SetLimit()
和 SetBurst()
方法来动态调整限流器的参数。 - 多个限流器: 如果你的应用需要对不同类型的操作进行限流(例如,对 API A 和 API B 有不同的速率限制),应该为每种操作创建独立的
rate.Limiter
实例
最后补一张令牌桶和漏桶算法区别
