Golang - Backoff+Ticker+Timer示例

AI 摘要: 本文介绍了操作重试的策略,包括指数退避算法和抖动机制,以及在网络传输和RPC服务中的应用。

1. 前言

操作重试有不同的策略(退避算法),同时操作重试在很场景中都使用到了,比如IP数据包发送(CSMA/CD)、网络通信,RPC服务调用等,有的是用于协调网络传输速率,避免网络拥塞,有的是为了考虑网络波动影响,提高服务可用性;

常见的是指数退避算法,通常起先是基于一个较低的时间间隔尝试操作,若尝试失败,则按指数级的逐步延长事件发生的间隔时间,直至超过最大尝试机会;大多数指数退避算法会利用抖动(随机延迟)来防止连续的冲突。

2. 简单的伪代码

以下是简单的指数退避算的伪代码,其中未考虑随机因子引入

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
retries = 0
DO
    Do some operation.
    // operation result check
    status = Get the result of the operation.
    IF status = SUCCESS
        retry = false
    ELSE IF status = FAIL
        retry = true
    ELSE
        Some other error occurred, so stop calling the API.
        retry = false
    END IF
    // retry 
    IF retry = true
        wait for (2^retries * 100) milliseconds
        retries = retries + 1
    END IF
WHILE (retry AND (retries < MAX_RETRIES))

3. Go中的backoff

基于github.com/cenkalti/backoff包。

退避算法有两块,一块是退避策略,即间隔多久操作下一次(退避策略);另一块是累计可以支持的最大操作次数(重试次数)

3.1. BackOff类型

BackOff是一个接口类型,提供NextBackOff()下次重试的间隔时间,Reset()支持退避恢复到初始状态

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
type BackOff interface {
    // NextBackOff returns the duration to wait before retrying the operation,
    // or backoff. Stop to indicate that no more retries should be made.
    //
    // Example usage:
    //
    //  duration := backoff.NextBackOff();
    //  if (duration == backoff.Stop) {
    //      // Do not retry operation.
    //  } else {
    //      // Sleep for duration and retry operation.
    //  }
    //
    NextBackOff() time.Duration

    // Reset to initial state.
    Reset()
}

3.2. 退避策略:指数、常数、零值退避策略

  • NewExponentialBackOff() :(失败后,基于指数级别的间隔,再次发起操作;考虑到了网络实际情况)
  • ZeroBackOff():零间隔时间退避(失败立马再次发起请求,针对网络抖动,通常会遇到连续失败情况)
  • ConstantBackOff(): 相同时间间隔的退避(失败后,间隔一个指定的时间,再次发起操作)

3.3. 重试次数:WithMaxRetries

  • WithMaxRetries(b BackOff, max uint64) BackOff:创建一个退避策略,选定一个BackOff类型,并设置最大尝试次数
  • Retry(o Operation, b BackOff) error:基于上述设定的退避策略,利用Retry函数,绑定到操作Operation上
1
2
3
4
5
6
// 操作类型
type Operation func() error
// PermanentError表示不继续重试该操作(比如参数错误、数据异常时候,重试也是徒劳)
type PermanentError struct {
    Err error
}

3.4. 指数退避算法

  • NewExponentialBackOff():创建指数退避类型
  • GetElapsedTime() time.Duration:获取自创建退避实例以来经过的时间,在调用Reset()时重置。
  • NextBackOff() time.Duration:使用公式计算下一个退避间隔
  • Reset():可以通过Reset()重置退避算法的耗时时间(最大耗时DefaultMaxElapsedTime=15分钟)

指数退避,下次操作(NextBackOff)触发时间间隔计算公式:

1
2
3
randomized interval = 
    RetryInterval * (random value in range [1 - RandomizationFactor, 1 + RandomizationFactor])
Randomized interval = RetryInterval +/- (RandomizationFactor * RetryInterval)

3.5. Ticker定时器

1
2
3
4
// 创建一个指定退避策略的定时器,这样当下次退避策略时限到达,将会自动产生一个时间消息到定时器通道
func NewTicker(b BackOff) *Ticker
// 定时器支持停止
func (t *Ticker) Stop()

3.6. Context上下文相关(值传递、ctx取消)

1
2
3
4
5
type BackOffContext interface {
    BackOff
    Context() context.Context
}
func WithContext(b BackOff, ctx context.Context) BackOffContext

4. backoff包示例(RPC调用重试)

当前RPC请求失败时候,进行退避尝试(有最大尝试限制),如果达到最大尝试次数依旧失败,则返回错误;否则,返回正确的RPC调用结果。

tips: 支持修改尝试次数和rpc的成功率修改

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// seed
func init() {
    rand.Seed(time.Now().UnixNano())
}

// rpc result
type RpcResult struct {
    code int
    msg  string
    data interface{}
}

// common rpc func
func doRpc() (*RpcResult, error) {

    fmt.Println("try rpc executing... ", time.Now().Format("15:04:05"))

    if rand.Intn(10) < int(SuccessProb*10) {
        return &RpcResult{100, "result is ok", "ok"}, nil
    } else {
        return nil, errors.New("rand error")
    }
}

// Retry times
var MaxRetry uint64 = 3
var SuccessProb = 0.5

// doRpc with backoff + retry
func DoRpcRetry() (*RpcResult, error) {

    rpcrs := new(RpcResult)

    opfn := func() (rpcerr error) {
        rpcrs, rpcerr = doRpc()
        return rpcerr
    }

    // exponential back off
    bkf := backoff.WithMaxRetries(backoff.NewExponentialBackOff(), MaxRetry)
    err := backoff.Retry(opfn, bkf)
    if err != nil {
        return nil, err
    }
    return rpcrs, err
}

func main() {
    data, err := DoRpcRetry()
    if err != nil {
        fmt.Println(err)
    } else {
        fmt.Printf("%#v", *data)
    }
}

5. Timer、Ticker

因为backoff内部有涉及定时重试功能,故顺带说下Timer和Ticker定时器

  • Timer(计时器):定时器适用于计划将来做某事的时间,类似于linux的at命令,可以通过timer.Reset(d Duration)重
    • Timer类型支持:C <-chan Time、Stop()、Reset(d Duration)
    • func NewTimer(d Duration) *Timer
    • func After(d Duration) <-chan Time
    • func AfterFunc(d Duration, f func()) *Timer
  • Ticker(定时器):定时器适用于定期重复执行某些操作的情况,类似于linux的crontab命令
    • Ticker类型支持:C <-chan Time、Stop()
    • func NewTicker(d Duration) *Ticker
    • func Tick(d Duration) <-chan Time

对比不难发现,Tickers使用与计时器Timer类似的机制:都会提供发送值的通道,两则都可以停止(停止ticker可以释放相关的资源)

不同的是:

  • timer需要通过Reset函数来重置计时器(time.Reset),同时timer也提供定期消息通知或者执行操作的的功能(time.After、time.AfterFunc)
  • ticker支持便捷的持续时钟通道(time.Tick),用于持续性的不需要停止的时钟行为(也不会被GC回收掉)

5.1. Timer示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func main() {

    timer := time.NewTimer(1 * time.Second)
    atch := time.After(10 * time.Second)
    over := make(chan string)

    // after 5 seconds close the timer
    go func() {
        time.AfterFunc(3*time.Second, func() {
            timer.Stop()
        })
    }()

    // go on print time.Timer or stop the goroutine when time after 10 seconds
    go func() {
        count := 5
        for {
            select {
            case at := <-atch:
                fmt.Println(at, "time.After Launch", count)
                over <- "over"
                return
            case t := <-timer.C:
                timer.Reset(1 * time.Second)
                fmt.Println(t, "time.Timer", count)
                if count--; count == 0 {
                    over <- "over"
                    return
                }
            }
        }
    }()

    fmt.Println(<-over)
    //fmt.Println(runtime.NumGoroutine())
}

5.2. Ticker(定时器)

创建一个定时器倒计时,可以输出任何内容取消!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func main() {

    tick := time.NewTicker(1 * time.Second)
    interrupt := make(chan string)

    // input anything to interrupt ticker
    go func() {
        input := bufio.NewScanner(os.Stdin)
        for input.Scan() {
            tick.Stop()
            interrupt <- "over"
        }
    }()

    count := 5
printPoint:
    for {
        select {
        case t := <-tick.C:
            fmt.Println(t, count)
            if count--; count == 0 {
                break printPoint
            }
        case <-interrupt:
            break printPoint
        }
    }

    fmt.Println("over")
}

6. 参考