Golang Rate - 令牌桶限流包使用

AI 摘要: Go语言中的golang.org/x/time/rate包是一个基于令牌桶算法的限流器实现,可应用于控制操作速率、防止系统过载以及遵守特定速率限制。令牌桶算法核心包括令牌生成速率、桶容量和请求消耗令牌的流程。在使用rate包时,要选择合适的速率和容量、使用context.Context传入带超时或取消功能的参数、保证并发安全、考虑性能、实现动态调整速率等。

背景

最近在项目开发中涉及 AI 网关的限频调用,以及任务消息消费的动态限速调整,正好都用到了令牌桶,这里简要做下记录

rate 限流器包简介

golang.org/x/time/rate  包是 Go 语言中一个非常实用的限流器实现,它基于令牌桶算法

这个包提供了一种简单而强大的方式来控制操作的速率,防止系统过载,或者在需要遵守特定速率限制(例如,调用第三方 API)时使用。

rate  包的核心是令牌桶算法。想象一个桶,里面装着“令牌”。

  1. 令牌生成速率 (Rate):  令牌以固定的速率(例如,每秒 10 个)被放入桶中。
  2. 桶容量 (Burst):  桶有一个最大容量。如果桶满了,新生成的令牌就会被丢弃。
  3. 请求消耗令牌:  当一个请求到来时,它会尝试从桶中取走一个令牌。
    • 如果桶中有足够的令牌,请求立即被允许执行,并消耗一个令牌。
    • 如果桶中没有足够的令牌,请求必须等待,直到有新的令牌生成并放入桶中。

这种机制确保了在短时间内可以处理突发请求(利用桶中已有的令牌),但长期来看,请求的平均速率不会超过令牌的生成速率。

rate 包方法简要说明

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// - r (rate.Limit): 这是令牌的生成速率,表示每秒允许的事件数
// - b (int): 这是令牌桶的容量(burst),表示桶中最多可以存储多少个令牌。它决定了限流器可以处理的突发请求数量。
func NewLimiter(r Limit, b int) *Limiter

// 非阻塞。如果桶中有足够的令牌,它会消耗一个令牌并返回 true;否则,返回 false。
func (lim *Limiter) Allow() bool
func (lim *Limiter) AllowN(now time.Time, n int) bool

// 阻塞。如果桶中没有令牌,它会阻塞当前 goroutine,等待直到有令牌可用。
// ctx 参数:
// - 如果 ctx 被取消(例如,context.WithCancel 或 context.WithTimeout),Wait 会立即返回 ctx.Err()。
// - 如果 ctx 为 nil,则不会有超时或取消,Wait 会一直等待直到令牌可用。
func (lim *Limiter) Wait(ctx context.Context) error
func (lim *Limiter) WaitN(ctx context.Context, n int) error

// 这些方法用于预留令牌,但不会立即消耗。
func (lim *Limiter) Reserve() *Reservation
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation

// 动态调整速率
limiter.SetLimit(rate.Limit(20)) // 调整为每秒20个
limiter.SetBurst(10)             // 调整容量为10

简单代码示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import (
	"fmt"
	"time"

	"golang.org/x/time/rate"
)

func main() {
	// 创建一个限流器:每秒允许10个事件,桶容量为5
	// 这意味着:
	// - 长期平均速率不会超过每秒10个请求。
	// - 短时间内,可以处理最多5个突发请求(如果桶是满的)。
	limiter := rate.NewLimiter(rate.Limit(10), 5)

	fmt.Printf("Limiter created: Rate=%v, Burst=%v\n", limiter.Limit(), limiter.Burst())

	// 尝试获取令牌
	// Allow() 是非阻塞的,消耗一个token,立即返回是否允许
	if limiter.Allow() {
		fmt.Println("Request 1 allowed immediately.")
	} else {
		fmt.Println("Request 1 denied immediately.")
	}

	// Wait() 是阻塞的,直到获取到令牌
	fmt.Println("Waiting for a token...")
	err := limiter.Wait(nil) // nil context表示没有超时或取消
	if err != nil {
		fmt.Printf("Wait error: %v\n", err)
	} else {
		fmt.Println("Request 2 allowed after waiting (if necessary).")
	}
}

使用令牌桶的常见场景

API 限速调用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// 限制对某个第三方API的调用,例如每秒100次
apiLimiter := rate.NewLimiter(rate.Limit(100), 100)

func callExternalAPI() {
    if err := apiLimiter.Wait(context.Background()); err != nil {
        fmt.Println("Failed to acquire token for API call:", err)
        return
    }
    // 实际调用API的逻辑
    fmt.Println("Calling external API...")
}

// 在goroutine中并发调用
for i := 0; i < 200; i++ {
    go callExternalAPI()
}

防止服务过载(中间件,单节点限流)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// 限制服务器处理请求的速率,例如每秒500个请求
requestLimiter := rate.NewLimiter(rate.Limit(500), 500)

func handleRequest(req interface{}) {
    if !requestLimiter.Allow() {
        // 请求过多,直接拒绝或返回错误
        fmt.Println("Request denied: Too many requests.")
        return
    }
    // 处理请求的逻辑
    fmt.Println("Processing request...")
}

// 在HTTP服务器的Handler中
// http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
//     if !requestLimiter.Allow() {
//         http.Error(w, "Too Many Requests", http.StatusTooManyRequests)
//         return
//     }
//     // ... 处理请求 ...
// })

控制后台任务执行速率

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// 限制后台任务的执行速率,例如每分钟10个任务
taskLimiter := rate.NewLimiter(rate.Every(time.Minute/10), 1)

func processTask(taskID int) {
    fmt.Printf("Processing task %d...\n", taskID)
    time.Sleep(50 * time.Millisecond) // 模拟任务耗时
}

func main() {
    fmt.Println("\n--- Background Task Limiting ---")
    for i := 0; i < 20; i++ {
        // 阻塞等待令牌,确保任务不会过快执行
        if err := taskLimiter.Wait(context.Background()); err != nil {
            fmt.Printf("Error waiting for task %d: %v\n", i, err)
            continue
        }
        go processTask(i)
    }
    time.Sleep(2 * time.Minute) // 等待所有任务完成
}

注意事项和最佳实践

  1. 选择合适的速率和容量:
    • 速率 (Rate):  根据你的业务需求和外部服务的限制来设定。
    • 容量 (Burst):  决定了你的系统能承受多大的瞬时流量冲击。容量越大,能处理的突发请求越多,但也会导致在突发情况下消耗更多资源。通常,容量应该至少等于速率,或者略大于速率,以应对短时间的流量高峰。
    • 如果  burst  设置为  0,则表示没有突发能力,每次请求都必须严格按照速率来。但通常不建议设为 0,因为这会使得限流器过于严格,无法处理任何瞬时波动。
  2. context.Context  的使用:
    • 在使用  Wait()  或  WaitN()  时,始终传入一个带有超时或取消功能的  context.Context。这可以防止 goroutine 无限期阻塞,提高程序的健壮性。
    • 对于服务器请求,通常使用请求的  context
  3. 并发安全: rate.Limiter  是并发安全的,可以在多个 goroutine 中安全地使用同一个  Limiter  实例。
  4. 性能考虑: rate.Limiter  的实现是高效的,基于原子操作和时间计算,不会引入显著的性能开销。
  5. 动态调整速率: rate.Limiter  提供了  SetLimit()  和  SetBurst()  方法来动态调整限流器的参数。
  6. 多个限流器:  如果你的应用需要对不同类型的操作进行限流(例如,对 API A 和 API B 有不同的速率限制),应该为每种操作创建独立的  rate.Limiter  实例

最后补一张令牌桶和漏桶算法区别

令牌桶和漏桶算法区别