1. 前言
操作重试有不同的策略(退避算法),同时操作重试在很场景中都使用到了,比如IP数据包发送(CSMA/CD)、网络通信,RPC服务调用等,有的是用于协调网络传输速率,避免网络拥塞,有的是为了考虑网络波动影响,提高服务可用性;
常见的是指数退避算法,通常起先是基于一个较低的时间间隔尝试操作,若尝试失败,则按指数级的逐步延长事件发生的间隔时间,直至超过最大尝试机会;大多数指数退避算法会利用抖动(随机延迟)来防止连续的冲突。
2. 简单的伪代码
以下是简单的指数退避算的伪代码,其中未考虑随机因子引入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| retries = 0
DO
Do some operation.
// operation result check
status = Get the result of the operation.
IF status = SUCCESS
retry = false
ELSE IF status = FAIL
retry = true
ELSE
Some other error occurred, so stop calling the API.
retry = false
END IF
// retry
IF retry = true
wait for (2^retries * 100) milliseconds
retries = retries + 1
END IF
WHILE (retry AND (retries < MAX_RETRIES))
|
3. Go中的backoff
基于github.com/cenkalti/backoff
包。
退避算法有两块,一块是退避策略,即间隔多久操作下一次(退避策略);另一块是累计可以支持的最大操作次数(重试次数)
3.1. BackOff类型
BackOff是一个接口类型,提供NextBackOff()
下次重试的间隔时间,Reset()
支持退避恢复到初始状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| type BackOff interface {
// NextBackOff returns the duration to wait before retrying the operation,
// or backoff. Stop to indicate that no more retries should be made.
//
// Example usage:
//
// duration := backoff.NextBackOff();
// if (duration == backoff.Stop) {
// // Do not retry operation.
// } else {
// // Sleep for duration and retry operation.
// }
//
NextBackOff() time.Duration
// Reset to initial state.
Reset()
}
|
3.2. 退避策略:指数、常数、零值退避策略
- NewExponentialBackOff() :(失败后,基于指数级别的间隔,再次发起操作;考虑到了网络实际情况)
- ZeroBackOff():零间隔时间退避(失败立马再次发起请求,针对网络抖动,通常会遇到连续失败情况)
- ConstantBackOff(): 相同时间间隔的退避(失败后,间隔一个指定的时间,再次发起操作)
3.3. 重试次数:WithMaxRetries
- WithMaxRetries(b BackOff, max uint64) BackOff:创建一个退避策略,选定一个BackOff类型,并设置最大尝试次数
- Retry(o Operation, b BackOff) error:基于上述设定的退避策略,利用Retry函数,绑定到操作Operation上
1
2
3
4
5
6
| // 操作类型
type Operation func() error
// PermanentError表示不继续重试该操作(比如参数错误、数据异常时候,重试也是徒劳)
type PermanentError struct {
Err error
}
|
3.4. 指数退避算法
- NewExponentialBackOff():创建指数退避类型
- GetElapsedTime() time.Duration:获取自创建退避实例以来经过的时间,在调用
Reset()
时重置。 - NextBackOff() time.Duration:使用公式计算下一个退避间隔
- Reset():可以通过
Reset()
重置退避算法的耗时时间(最大耗时DefaultMaxElapsedTime=15分钟)
指数退避,下次操作(NextBackOff)触发时间间隔计算公式:
1
2
3
| randomized interval =
RetryInterval * (random value in range [1 - RandomizationFactor, 1 + RandomizationFactor])
Randomized interval = RetryInterval +/- (RandomizationFactor * RetryInterval)
|
3.5. Ticker定时器
1
2
3
4
| // 创建一个指定退避策略的定时器,这样当下次退避策略时限到达,将会自动产生一个时间消息到定时器通道
func NewTicker(b BackOff) *Ticker
// 定时器支持停止
func (t *Ticker) Stop()
|
3.6. Context上下文相关(值传递、ctx取消)
1
2
3
4
5
| type BackOffContext interface {
BackOff
Context() context.Context
}
func WithContext(b BackOff, ctx context.Context) BackOffContext
|
4. backoff包示例(RPC调用重试)
当前RPC请求失败时候,进行退避尝试(有最大尝试限制),如果达到最大尝试次数依旧失败,则返回错误;否则,返回正确的RPC调用结果。
tips: 支持修改尝试次数和rpc的成功率修改
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
| // seed
func init() {
rand.Seed(time.Now().UnixNano())
}
// rpc result
type RpcResult struct {
code int
msg string
data interface{}
}
// common rpc func
func doRpc() (*RpcResult, error) {
fmt.Println("try rpc executing... ", time.Now().Format("15:04:05"))
if rand.Intn(10) < int(SuccessProb*10) {
return &RpcResult{100, "result is ok", "ok"}, nil
} else {
return nil, errors.New("rand error")
}
}
// Retry times
var MaxRetry uint64 = 3
var SuccessProb = 0.5
// doRpc with backoff + retry
func DoRpcRetry() (*RpcResult, error) {
rpcrs := new(RpcResult)
opfn := func() (rpcerr error) {
rpcrs, rpcerr = doRpc()
return rpcerr
}
// exponential back off
bkf := backoff.WithMaxRetries(backoff.NewExponentialBackOff(), MaxRetry)
err := backoff.Retry(opfn, bkf)
if err != nil {
return nil, err
}
return rpcrs, err
}
func main() {
data, err := DoRpcRetry()
if err != nil {
fmt.Println(err)
} else {
fmt.Printf("%#v", *data)
}
}
|
5. Timer、Ticker
因为backoff内部有涉及定时重试功能,故顺带说下Timer和Ticker定时器
- Timer(计时器):定时器适用于计划将来做某事的时间,类似于linux的at命令,可以通过timer.Reset(d Duration)重
- Timer类型支持:C <-chan Time、Stop()、Reset(d Duration)
- func NewTimer(d Duration) *Timer
- func After(d Duration) <-chan Time
- func AfterFunc(d Duration, f func()) *Timer
- Ticker(定时器):定时器适用于定期重复执行某些操作的情况,类似于linux的crontab命令
- Ticker类型支持:C <-chan Time、Stop()
- func NewTicker(d Duration) *Ticker
- func Tick(d Duration) <-chan Time
对比不难发现,Tickers使用与计时器Timer类似的机制:都会提供发送值的通道,两则都可以停止(停止ticker可以释放相关的资源)
不同的是:
- timer需要通过Reset函数来重置计时器(time.Reset),同时timer也提供定期消息通知或者执行操作的的功能(time.After、time.AfterFunc)
- ticker支持便捷的持续时钟通道(time.Tick),用于持续性的不需要停止的时钟行为(也不会被GC回收掉)
5.1. Timer示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| func main() {
timer := time.NewTimer(1 * time.Second)
atch := time.After(10 * time.Second)
over := make(chan string)
// after 5 seconds close the timer
go func() {
time.AfterFunc(3*time.Second, func() {
timer.Stop()
})
}()
// go on print time.Timer or stop the goroutine when time after 10 seconds
go func() {
count := 5
for {
select {
case at := <-atch:
fmt.Println(at, "time.After Launch", count)
over <- "over"
return
case t := <-timer.C:
timer.Reset(1 * time.Second)
fmt.Println(t, "time.Timer", count)
if count--; count == 0 {
over <- "over"
return
}
}
}
}()
fmt.Println(<-over)
//fmt.Println(runtime.NumGoroutine())
}
|
5.2. Ticker(定时器)
创建一个定时器倒计时,可以输出任何内容取消!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| func main() {
tick := time.NewTicker(1 * time.Second)
interrupt := make(chan string)
// input anything to interrupt ticker
go func() {
input := bufio.NewScanner(os.Stdin)
for input.Scan() {
tick.Stop()
interrupt <- "over"
}
}()
count := 5
printPoint:
for {
select {
case t := <-tick.C:
fmt.Println(t, count)
if count--; count == 0 {
break printPoint
}
case <-interrupt:
break printPoint
}
}
fmt.Println("over")
}
|
6. 参考