Go Sync - 并发同步控制相关包解析

1. Go并发相关包

  1. sync
  2. sync/atomic
  3. golang.org/x/sync: 提供了sync和 sync/atomic 额外的一些并发原语

2. 标准库 sync

**锁接口: **

type Locker interface {
	Lock()
	Unlock()
}

2.1. sync.Cond 条件

Cond 实现了一个条件变量,一个等待或宣布事件发生的 goroutines 的集合点。每个 Cond 都有一个关联的 Locker L(通常是 *Mutex*RWMutex),在更改条件和调用等待方法。

第一次使用后不得复制 Cond。

type Cond struct {
	// L is held while observing or changing the condition
	L Locker
}

// 初始化
func NewCond(l Locker) *Cond

// 广播唤醒所有等待 c 的 goroutine。
func (c *Cond) Broadcast()

// 发射信号,唤醒一个等待 c 的 goroutine(如果有的话)
func (c *Cond) Signal()

// 等待原子地解锁 c.L 并暂停调用 goroutine 的执行。
// 稍后恢复执行后,Wait 在返回之前锁定 c.L。
// 与其他系统不同,Wait 不能返回,除非被 `Broadcast` 或 `Signal` 唤醒。
//  因为当 Wait 第一次恢复时 c.L 没有被锁定,调用者通常不能假设 Wait 返回时条件为真。相反,调用者应该在循环中等待:
func (c *Cond) Wait()

2.1.1. sync.Cond 的操作

c.L.Lock()
for !condition() {
    c.Wait()
}
... make use of condition ...
c.L.Unlock()

2.2. sync.Map 并发map访问

Map 类似于 Go 的 map[interface{}]interface{},但可以安全地被多个 goroutine 并发使用,而无需额外的锁定或协调

加载、存储和删除以摊销的常数时间运行。

大多数代码应该使用普通的 Go Map来代替,使用单独的锁定或协调,以获得更好的类型安全性并更容易维护

sync.Map 类型针对两个常见用例进行了优化:

  1. 当给定 key 只被写入一次,但读取多次,例如在只增长的缓存中
  2. 当多个 goroutine 并发的读取、写入和覆盖不相交 key 的时候

在这两种情况下,与使用单独的 Mutex 或 RWMutex 配对的 Go map 相比,使用 sync.Map 可以显着减少锁争用, sync.Map的是零值是空的并且可以直接使用,另外注意首次使用后不得复制sync.Map

2.2.1. sync.Map操作

type Map struct {
	// contains filtered or unexported fields
}

// sync.Map 删除key
func (m *Map) Delete(key any)

// sync.Map 返回key对应的值,不存在的值返回nil,ok 表示是否在sync.Map找到值,即是否key存在
func (m *Map) Load(key any) (value any, ok bool)

// 返回key存在的值,并删除掉该key,loaded返回是否key存在
func (m *Map) LoadAndDelete(key any) (value any, loaded bool)

// 存储key当前值,若值能Load出来 => true,值Load不出来返回false
func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool)

// 通过f 迭代sync.Map的值,range时候非某一时刻sync.Map的快照,即若存在其他 goroutine 并发Store,则可能在range时刻返回出来
func (m *Map) Range(f func(key, value any) bool)

// 存储key对应的val
func (m *Map) Store(key, value any)

2.3. sync.Once 一次执行

Once 是一个将执行一个操作的对象。A Once 在第一次使用后不能被复制。

如果 once.Do(f) 被多次调用,只有第一次调用会调用 f,即使 f 在每次调用中都有不同的值。

执行每个函数都需要一个新的 Once 实例,Do 旨在用于必须只运行一次的初始化,由于 f 是 niladic,因此可能需要使用函数文字来捕获要由 Do 调用的函数的参数

在 f 内继续调用 once.Do() 则会死锁

config.once.Do(func() { config.init(filename) })

2.4. sync.Pool 池化管理

一组可以单独保存和检索的临时对象

sync.Pool的主要目的是缓存已分配但未使用的项目以供以后重用,减轻垃圾收集器的压力,也就是说,它使构建高效、线程安全的空闲列表变得容易,但是,它并不适用于所有空闲列表。(类似均摊算法)

  1. Pool 可以安全地同时被多个 goroutine 使用 (并发安全)
  2. Pool 通过管理一可能被重用的临时项目,提供了一种在许多客户端之间分摊分配开销的方法。

注意:存储在池中的任何项目(item)都可能随时自动删除,不会做另行通知;若删除时候发现,仅 Pool 拥有item的唯一的引用,则该item项目可能会被释放(资源回收)

一个很好地使用池的例子是在 fmt 包中,它维护一个动态大小的临时输出缓冲区存储,存储在负载下扩展(当许多 goroutine 正在积极打印时)并在休眠的时候缩小(资源回收)。 另一方面,作为短期对象的一部分维护的空闲列表不适合用 Pool 池化,因为开销不会摊销在那种情况下。让这些对象实现它们自己的空闲列表更有效, (临时对象重用性不高的不需要池化)

2.4.1. sync.Pool 方法

Get 从 Pool 中选择任意项目,将其从 Pool 中删除,并将其返回给调用者,调用者不应假定传递给 Put 的值与 Get 返回的值之间存在任何关系。

如果 p.Get() 返回 nil ,且 p.New() 不为零,则 p.Get() 返回 p.New() 的结果

type Pool struct {
    // New()指定一个函数生成一个值,注意该值不能和Get()同时并发修改
	New func() any
}

// 从池中获取资源
pool.Get() any

// 将资源放回池
pool.Put(x any)

可以参考Mysql Pool各类客户端包在池化方面的使用

2.5. sync.Mutex 互斥锁 (读写均会阻塞)

mu.Lock(): 锁定rw用于写入,其他G来读、写均会阻塞
mu.UnLock(): 解写锁
mu.TryLock() bool: 尝试加读写锁

2.6. sync.RWMutex 并发读写互斥 (支持并发读,在读多写少场景适用)

RWMutex 是读写器互斥锁,锁可以由任意数量的读取器单个写入器持有,RWMutex的零值是未锁定的互斥锁。

注意: RWMutex 在首次使用后不得复制。

如果一个 goroutine 持有一个用于读取的 RWMutex,而另一个 goroutine 可能会调用 Lock,那么在初始读取锁被释放之前,任何 goroutine 都不应该期望能够获取读取锁。特别是,这禁止递归读锁定。这是为了确保锁最终可用;阻塞的 Lock 调用会阻止新读者获取锁。

G1 -> 获取 rwmu,用于读取数据
G2 -> 获取 rwmu,可以加锁Lock()
G2..GN -> 获取 rwmu,想获取RLock() 失败,直到G2调用 UnLock()释放锁


rwmu.Lock(): 锁定rw用于写入,其他G来读、写均会阻塞
rwmu.UnLock(): 解写锁

rwmu.RLock(): 锁定rw用于读取,其他G可以并发读取,但写入会阻塞
rwmu.RUnlock(): 解读锁

rwmu.TryLock() bool: 尝试加读写锁
rwmu.TryRLock() bool: 尝试加读锁 

rwmu.RLocker(): 或取读锁

2.7. sync.WaitGroup 等待

WaitGroup 等待一组 goroutine 完成

  1. goroutine 调用 wg.Add() 来设置要等待的 goroutine 的数量。
  2. 然后,每个 goroutine 运行并在完成时调用 wg.Done()
  3. 同时,wg.Wait() 可以用来阻塞,直到所有的 goroutine 都完成
package main

import (
	"sync"
)

type httpPkg struct{}

func (httpPkg) Get(url string) {}

var http httpPkg

func main() {
	var wg sync.WaitGroup
	var urls = []string{
		"http://www.golang.org/",
		"http://www.google.com/",
		"http://www.example.com/",
	}
	for _, url := range urls {
		// Increment the WaitGroup counter.
		wg.Add(1)
		// Launch a goroutine to fetch the URL.
		go func(url string) {
			// Decrement the counter when the goroutine completes.
			defer wg.Done()
			// Fetch the URL.
			http.Get(url)
		}(url)
	}
	// Wait for all HTTP fetches to complete.
	wg.Wait()
}

3. 标准库 sync/atomic

atomic 提供了用于实现同步算法的低级原子内存原语, 这些函数需要非常小心才能正确使用。

除了特殊的低级应用程序外,最好使用通道sync 同步包的工具来完成同步。

第一性原则: 通过通信共享内存,不要通过共享内存进行通信。

3.1. atomic 包使用

// Add 以原子方式将 delta 添加到 *addr 并返回新值
func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)

// CAS比较交换
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)

// 加载返回
func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)

// 存储设置
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)

// 交换
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)

// atomic.Value{} 类型
type Value
    func (v *Value) CompareAndSwap(old, new any) (swapped bool)
    func (v *Value) Load() (val any)
    func (v *Value) Store(val any)
    func (v *Value) Swap(new any) (old any)

3.1.1. atomic 方法对应的基本含义

// Add
*addr += delta
return *addr

// CAS
if *addr == old {
	*addr = new
	return true
}
return false

// Swap
old = *addr
*addr = new
return old

// Store
*addr = val

// Load
return *addr

3.2. aotmic 使用Case

3.2.1. Value(Config) - 一个定期Reload更新Store,多个并发Load读取配置的实现

import (
	"sync/atomic"
	"time"
)

// 加载配置
func loadConfig() map[string]string {
	return make(map[string]string)
}

// 
func requests() chan int {
	return make(chan int)
}

func main() {
    // 存储系统配置
	var config atomic.Value 

	// 初始化系统配置,加载并存储
	config.Store(loadConfig())

    // 一个定期Reload Config的协程
	go func() {
		for {
			time.Sleep(10 * time.Second)
			config.Store(loadConfig())
		}
	}()
.
    // 模拟多个协程并发处理接收的请求,并发的Load()配置
	for i := 0; i < 10; i++ {
		go func() {
			for r := range requests() {
				c := config.Load()
				// Handle request r using config c.
				_, _ = r, c
			}
		}()
	}
}

3.2.2. Value(ReadMostly) - 读多写少,采用写时复制

相比直接map操作,atomic下面的写时复制,锁冲突粒度更小,性能更高

import (
	"sync"
	"sync/atomic"
)

func main() {

    // 
	type Map map[string]string

    // 初始化atomic.Value,并存储Map
	var m atomic.Value
	m.Store(make(Map))

    // 写锁
	var mu sync.Mutex // used only by writers

    // 从存储中读取数据,无需进一步再做同步处理
	read := func(key string) (val string) {
		m1 := m.Load().(Map)
		return m1[key]
	}

	// insert function can be used to update the data without further synchronization
    // 写入函数,针对Map并发写,加入
	insert := func(key, val string) {
        // mu加锁
		mu.Lock() // synchronize with other potential writers
		defer mu.Unlock()

        // 原子读取
		m1 := m.Load().(Map) // load current value of the data structure
  
        // 写时复制,copy到一个新的map m2内
		m2 := make(Map)      // create a new value
		for k, v := range m1 {
			m2[k] = v // copy all data from the current object to the new one
		}

        // 针对m2 做一些更新
		m2[key] = val // do the update that we need

        // 重新设置m2到 atomic内 (锁冲突的时间很短)
		m.Store(m2)   // atomically replace the current object with the new one

        // 这个点后的新读取,都读到新的值
		// At this point all new readers start working with the new version.
		// The old version will be garbage collected once the existing readers
		// (if any) are done with it.
	}
	_, _ = read, insert
}

4. golang.org/sync 使用

// import
go get -u golang.org/x/sync

4.1. errgroup

errgroup 为处理公共任务的子任务的 goroutine 组提供同步、错误传播和上下文取消。

Group 是一组 goroutines,它们处理属于同一整体任务的子任务Group 零值是有效的,对活动 goroutines 的数量没有限制,并且不会因错误而取消。

type Group
    // WithContext 返回一个新组和一个从 ctx 派生的关联上下文。
    func WithContext(ctx context.Context) (*Group, context.Context)

    // 在新的 goroutine 中调用给定的函数。它一直阻塞,直到可以添加新的 goroutine 而组中的活动 goroutine 数量不超过配置的限制。
    func (g *Group) Go(f func() error)

    // 并发度限制,负值表示没有限制
    // SetLimit 将这个组中的活动 goroutine 的数量限制为最多 n,当组中的任何 goroutine 处于活动状态时,不得修改限制。
    // 任何后续调用 Group.Go 方法都会阻塞,直到它可以添加一个活动的 goroutine 而不会超过配置的限制。
    func (g *Group) SetLimit(n int)

    // 并发度限制,TryGo不超过limit限制
    // TryGo 仅当组中的活动 goroutine 数量当前低于配置的Limit n 限制时,才会在新的 goroutine 中调用给定函数。
    // 返回值报告 goroutine 是否已启动。
    func (g *Group) TryGo(f func() error) bool

    // g.Wati等待阻塞直到来自 Go 方法的所有函数调用都返回,然后从它们返回第一个非零错误(如果有)。
    func (g *Group) Wait() error

4.1.1. errgroup Example - JustErrors 简化 sync.WaitGroup 计数处理

JustErrors 说明了使用 errgroup.Group 代替 sync.WaitGroup 来简化 goroutine 计数和错误处理。此示例源自 https://golang.org/pkg/sync/#example_WaitGroup 上的 sync.WaitGroup 示例。

import (
	"fmt"
	"net/http"

	"golang.org/x/sync/errgroup"
)

func main() {

    // 初始化errgroup.Group
	g := new(errgroup.Group)

    // 并发请求
	var urls = []string{
		"http://www.golang.org/",
		"http://www.google.com/",
		"http://www.somestupidname.com/",
	}
    
	for _, url := range urls {
        // 这是因为循环的每次迭代都使用变量 `url` 的相同实例,因此每个闭包共享该单个变量。
        // 当闭包go func(){} 运行时,内部的url 可能在 goroutine 启动后已被修改
        // 为了帮助在这些问题和其他问题发生之前发现它们,运行 go vet (Vet 检查 Go 源代码并报告可疑结构, `go doc cmd/vet` )
		url := url // 闭包问题,需要重新赋值,或者是将变量作为参数传递给闭包(推荐) https://golang.org/doc/faq#closures_and_goroutines

        // 闭包函数, Launch a goroutine to fetch the URL.
		g.Go(func() error {
			// Fetch the URL.
			resp, err := http.Get(url)
			if err == nil {
				resp.Body.Close()
			}

            // 第一个返回 非 `nil` 错误会取消组Group,其错误将由 Wait 返回 (若有错想继续执行,则需要返回nil)
			return err 
		})
	}

	// 等待所有HTTP fetch返回完成,若内部有一个错误,group被cancel
	if err := g.Wait(); err == nil {
		fmt.Println("Successfully fetched all URLs.")
	}
}

4.1.2. errgroup Example - Parallel 简化并行任务

Parallel 说明了使用 Group 来同步一个简单的并行任务:来自 https://talks.golang.org/2012/concurrency.slide#46 的“Google Search 2.0”功能,增加了上下文和错误处理。

package main

import (
	"context"
	"fmt"
	"os"

	"golang.org/x/sync/errgroup"
)

var (
	Web   = fakeSearch("web")
	Image = fakeSearch("image")
	Video = fakeSearch("video")
)

// 结果类型支持搜索
type Result string
type Search func(ctx context.Context, query string) (Result, error)

// fake搜索函数
func fakeSearch(kind string) Search {

    // fakesearch func
	return func(_ context.Context, query string) (Result, error) {
		return Result(fmt.Sprintf("%s result for %q", kind, query)), nil
	}
}

func main() {
    // Google函数,建设内容并返回结果
	Google := func(ctx context.Context, query string) ([]Result, error) {
        // errgroup 带ctx 初始化
        g, ctx := errgroup.WithContext(ctx)

        // 初始化搜到和结果
		searches := []Search{Web, Image, Video}
		results := make([]Result, len(searches))
		for i, search := range searches {
			i, search := i, search // https://golang.org/doc/faq#closures_and_goroutines
			
            // 利用g.Go()开启协程执行并发搜索
            g.Go(func() error {
				result, err := search(ctx, query) // 执行fakesearch
				if err == nil {
					results[i] = result
				}

                // 有错直接返回
				return err
			})
		}

        // g.Wait()阻塞等待
		if err := g.Wait(); err != nil {
			return nil, err
		}
		return results, nil
	}

    // 检索golang
	results, err := Google(context.Background(), "golang")
	if err != nil {
		fmt.Fprintln(os.Stderr, err)
		return
	}

    // 输出Google结果
	for _, result := range results {
		fmt.Println(result)
	}

}

4.1.3. errgroup Example - Pipeline

Pipeline 演示了使用 Group 来实现多阶段管道:来自 https://blog.golang.org/pipelines 的具有有限并行性的 MD5All 函数版本。

package main

import (
	"context"
	"crypto/md5"
	"fmt"
	"io/ioutil"
	"log"
	"os"
	"path/filepath"

	"golang.org/x/sync/errgroup"
)

// Pipeline demonstrates the use of a Group to implement a multi-stage
// pipeline: a version of the MD5All function with bounded parallelism from
// https://blog.golang.org/pipelines.
func main() {
	m, err := MD5All(context.Background(), ".")
	if err != nil {
		log.Fatal(err)
	}

	for k, sum := range m {
		fmt.Printf("%s:\t%x\n", k, sum)
	}
}

type result struct {
	path string
	sum  [md5.Size]byte
}

// MD5All读取在root下的所有文件,返回所有文件的md5 sum值,如果路径walk失败,则返回一个错误
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
	// ctx is canceled when g.Wait() returns. When this version of MD5All returns
	// - even in case of error! - we know that all of the goroutines have finished
	// and the memory they were using can be garbage-collected.

    // 当g.Wait()返回时候,ctx被取消,所有groutine结束可以被GC回收
	g, ctx := errgroup.WithContext(ctx)
	paths := make(chan string)

    // 第一个Goroutine用于遍历path下的文件,输入到paths chan内
	g.Go(func() error {
        // 关闭 chan
		defer close(paths)

        // filepath.Walk() 迭代文件路径
		return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
			if err != nil {
				return err
			}
			if !info.Mode().IsRegular() {
				return nil
			}
			select {
			case paths <- path: // 有路径,放入paths chan
			case <-ctx.Done():  // ctx完成,返回ctx.Err
				return ctx.Err()
			}
			return nil
		})
	})

	// Start a fixed number of goroutines to read and digest files.
    // 开启另外一系列Groutine,来读取文件,并计算md5摘要 -> 并发操作
	c := make(chan result)
	const numDigesters = 20
	for i := 0; i < numDigesters; i++ {
		g.Go(func() error {
			for path := range paths { // 迭代文件path信息,读取文件,计算md5摘要
				data, err := ioutil.ReadFile(path)
				if err != nil {
					return err
				}
				select {
				case c <- result{path, md5.Sum(data)}:
				case <-ctx.Done():
					return ctx.Err()
				}
			}
			return nil
		})
	}

    // 等待摘要所有Group的Groutine处理完成,再关闭result通道
	go func() {
		g.Wait()
		close(c)
	}()

    // 迭代result通道,将数据记录到map,并返回
	m := make(map[string][md5.Size]byte)
	for r := range c {
		m[r.path] = r.sum
	}

    // 检查是否有任何 goroutines 失败,由于 g 正在累积错误,我们不需要独自发送它们(或检查它们)在通道上发送的结果
	if err := g.Wait(); err != nil {
		return nil, err
	}
	return m, nil
}

4.2. semaphore

包 semaphore 提供加权信号量实现

4.3. singlefight - singlefight.Group

包 singleflight 提供了重复函数调用抑制机制。

// 组代表一类工作,并形成一个命名空间,其中工作单元可以通过重复抑制来执行。
type Group struct {
	// contains filtered or unexported fields
}

// Do 执行并返回给定函数的结果,确保一次只针对给定键执行一次执行。
// 如果出现key重复,则重复调用者会等待原始调用者完成并接收相同的结果。
//  返回值 shared 指示是否将 v 提供给多个调用者。
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)

// DoChan 与 Do 类似,但返回一个通道,当结果就绪时将接收结果。
// 返回的通道不会关闭。
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result

// 通道内消息的结果结构体类型
type Result struct {
	Val    interface{}
	Err    error
	Shared bool
}

// Forget 告诉 singleflight 忘记一个键,
//  未来对该键的 Do 调用将调用该函数,而不是等待较早的调用完成。
func (g *Group) Forget(key string)

4.4. syncmap

包 syncmap 提供了一个并发映射实现。

5. 参考

  1. golang.org/x/sync: https://pkg.go.dev/golang.org/x/sync@v0.0.0-20220601150217-0de741cfad7f
  2. pkg.go.dev/sync/atomi
  3. pkg.go.dev/sync