Go - Sync同步原语相关包(1) - Errgroup

AI 摘要: errgroup是golang.org/x/sync包中的一个模块,用于管理多个goroutine的错误处理和协同工作。

https://pkg.go.dev/golang.org/x/sync/errgroup

1. errgroup

1.1. 示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func TestEgroupA(t *testing.T) {
	// errgroup 部分
	egroup := errgroup.Group{}
	
	// 控制并发度
	egroup.SetLimit(3)

	// 启动协程并行工作
	for i := 0; i < 5; i++ {
		i := i // 特别注意,闭包问题
		egroup.Go(func() error {
			t.Logf("#%d", i)
			return nil
		})
	}

  // 首个非nil错误
	if err := egroup.Wait(); err != nil {
		t.Errorf("egroup got err: %s", err)
	}

	t.Logf("done")
}

func TestEgroupB(t *testing.T) {
	a, b, c := "a", "b", "c"
	list := []*string{&a, &b, &c}
	// errgroup 部分
	egroup := errgroup.Group{}
	for k, v := range list {
		k, v := k, v // 特别注意,闭包问题
		egroup.Go(func() error {
			t.Logf("#%d, v=>%v, val v=>%v", k, v, *v)
			return nil
		})
	}

	if err := egroup.Wait(); err != nil {
		t.Errorf("egroup got err: %s", err)
	}

	t.Logf("done")
}

1.2. 包安装

1
2
// 注意低版本的errgroup,不带SetLimit(n)并发度控制,注意更新
go get -u golang.org/x/sync/errgroup

1.3. 包说明

包errgroup为一组协程提供了同步、错误增值、上下文Ctx Cancel 完成同一个任务。

  1. 实际上errgroup.Group实际上是封装了标准库的 sync.WaitGroup,即封装了标准的wg.Add()go func()defer wg.Done()sema协程并发
  2. errgroup.Group的零值就可用,零值没有限制活跃的Goroutine数量,子任务遇到error时候不会取消cancel整体的并发协程
  3. 并发度控制通过 SetLimit(n) 限制,如果n为负数则表示无限制,当运行的egroup内的协程已超过达到n,后续会阻塞直至有协程完成释放出sema信号量。但注意egroup不能在group已运行的情况下再去设置,这个时候会报panic错误 → 如果防御编程,可以酌情使用recover()
  4. TryGo()有点类似非阻塞概念,主要应用在有g.sem信号控制情况,如果没有SetLimit(n)操作,即g.sem=nil,默认不阻塞协程;快速返回false启动协程失败(返回值false表示TryGo执行失败,返回true表示TryGo协程启动成功)
  5. 特别注意: 闭包问题

1.4. 代码解析

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// Group
type Group struct {
	cancel func() // 初始化零值 cancel为nil,则
	wg sync.WaitGroup
	sem chan token // 初始化零值时候,sem为nil,在egroup.Go(func..)就没有了并发限制
	errOnce sync.Once
	err     error
}

// 零值
var group *errgroup.Group

// 带ctx上下文,用于超时取消一组group(将cancel设置,用于后续cancel)
func WithContext(ctx context.Context) (*Group, context.Context) {
	ctx, cancel := context.WithCancel(ctx)
	return &Group{cancel: cancel}, ctx
}

// 等待所有的Go方法返回,然后返回首个非nil错误
func (g *Group) Wait() error {
	g.wg.Wait()
	if g.cancel != nil {
		g.cancel()
	}
	return g.err
}

// 基于给定的func开启并发协程,标准的wg.Add()、go func()、defer wg.Done()、sema协程并发
func (g *Group) Go(f func() error) {
	if g.sem != nil {
		g.sem <- token{}
	}

	g.wg.Add(1)
	go func() {
		defer g.done()

		if err := f(); err != nil {
			g.errOnce.Do(func() {
				g.err = err
				if g.cancel != nil {
					g.cancel()
				}
			})
		}
	}()
}

// 控制并发度,即通过缓存chan实现
func (g *Group) SetLimit(n int) {
	if n < 0 {
		g.sem = nil
		return
	}
	if len(g.sem) != 0 {
		panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
	}
	g.sem = make(chan token, n)
}

// TryGo有点类似非阻塞概念,主要应用在有g.sem信号控制情况,如果没有SetLimit(n)操作,默认不阻塞协程,
// 快速返回false启动协程失败
// 返回值false表示TryGo执行失败,返回true表示TryGo协程启动成功
func (g *Group) TryGo(f func() error) bool {
	if g.sem != nil {
		select {
		case g.sem <- token{}:
			// Note: this allows barging iff channels in general allow barging.
		default:
			return false
		}
	}

	g.wg.Add(1)
	go func() {
		defer g.done()

		if err := f(); err != nil {
			g.errOnce.Do(func() {
				g.err = err
				if g.cancel != nil {
					g.cancel()
				}
			})
		}
	}()
	return true
}