1. Go并发相关包
sync
sync/atomic
golang.org/x/sync
: 提供了sync和 sync/atomic
额外的一些并发原语
2. 标准库 sync
包
**锁接口: **
1
2
3
4
| type Locker interface {
Lock()
Unlock()
}
|
2.1. sync.Cond 条件
Cond
实现了一个条件变量,一个等待或宣布事件发生的 goroutines 的集合点。每个 Cond 都有一个关联的 Locker L(通常是 *Mutex
或 *RWMutex
),在更改条件和调用等待方法。
第一次使用后不得复制 Cond。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| type Cond struct {
// L is held while observing or changing the condition
L Locker
}
// 初始化
func NewCond(l Locker) *Cond
// 广播唤醒所有等待 c 的 goroutine。
func (c *Cond) Broadcast()
// 发射信号,唤醒一个等待 c 的 goroutine(如果有的话)
func (c *Cond) Signal()
// 等待原子地解锁 c.L 并暂停调用 goroutine 的执行。
// 稍后恢复执行后,Wait 在返回之前锁定 c.L。
// 与其他系统不同,Wait 不能返回,除非被 `Broadcast` 或 `Signal` 唤醒。
// 因为当 Wait 第一次恢复时 c.L 没有被锁定,调用者通常不能假设 Wait 返回时条件为真。相反,调用者应该在循环中等待:
func (c *Cond) Wait()
|
2.1.1. sync.Cond 的操作
1
2
3
4
5
6
| c.L.Lock()
for !condition() {
c.Wait()
}
... make use of condition ...
c.L.Unlock()
|
2.2. sync.Map 并发map访问
Map 类似于 Go 的 map[interface{}]interface{}
,但可以安全地被多个 goroutine 并发使用,而无需额外的锁定或协调。
加载、存储和删除以摊销的常数时间运行。
大多数代码应该使用普通的 Go Map来代替,使用单独的锁定或协调,以获得更好的类型安全性并更容易维护。
sync.Map
类型针对两个常见用例进行了优化:
- 当给定
key
只被写入一次,但读取多次,例如在只增长的缓存中 - 当多个
goroutine
并发的读取、写入和覆盖不相交 key 的时候
在这两种情况下,与使用单独的 Mutex 或 RWMutex 配对的 Go map 相比,使用 sync.Map
可以显着减少锁争用, sync.Map
的是零值是空的并且可以直接使用,另外注意首次使用后不得复制sync.Map
2.2.1. sync.Map操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| type Map struct {
// contains filtered or unexported fields
}
// sync.Map 删除key
func (m *Map) Delete(key any)
// sync.Map 返回key对应的值,不存在的值返回nil,ok 表示是否在sync.Map找到值,即是否key存在
func (m *Map) Load(key any) (value any, ok bool)
// 返回key存在的值,并删除掉该key,loaded返回是否key存在
func (m *Map) LoadAndDelete(key any) (value any, loaded bool)
// 存储key当前值,若值能Load出来 => true,值Load不出来返回false
func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool)
// 通过f 迭代sync.Map的值,range时候非某一时刻sync.Map的快照,即若存在其他 goroutine 并发Store,则可能在range时刻返回出来
func (m *Map) Range(f func(key, value any) bool)
// 存储key对应的val
func (m *Map) Store(key, value any)
|
2.3. sync.Once 一次执行
Once 是一个将执行一个操作的对象。A Once 在第一次使用后不能被复制。
如果 once.Do(f)
被多次调用,只有第一次调用会调用 f,即使 f 在每次调用中都有不同的值。
执行每个函数都需要一个新的 Once
实例,Do 旨在用于必须只运行一次的初始化,由于 f 是 niladic,因此可能需要使用函数文字来捕获要由 Do 调用的函数的参数
在 f 内继续调用 once.Do()
则会死锁
1
| config.once.Do(func() { config.init(filename) })
|
2.4. sync.Pool 池化管理
一组可以单独保存和检索的临时对象
sync.Pool
的主要目的是缓存已分配但未使用的项目以供以后重用,减轻垃圾收集器的压力,也就是说,它使构建高效、线程安全的空闲列表变得容易,但是,它并不适用于所有空闲列表。(类似均摊算法)
Pool
可以安全地同时被多个 goroutine
使用 (并发安全)Pool
通过管理一可能被重用的临时项目,提供了一种在许多客户端之间分摊分配开销的方法。
注意:存储在池中的任何项目(item)都可能随时自动删除,不会做另行通知;若删除时候发现,仅 Pool 拥有item的唯一的引用,则该item项目可能会被释放(资源回收)
一个很好地使用池的例子是在 fmt
包中,它维护一个动态大小的临时输出缓冲区存储,存储在负载下扩展(当许多 goroutine 正在积极打印时)并在休眠的时候缩小(资源回收)。
另一方面,作为短期对象的一部分维护的空闲列表不适合用 Pool 池化,因为开销不会摊销在那种情况下。让这些对象实现它们自己的空闲列表更有效, (临时对象重用性不高的不需要池化)
2.4.1. sync.Pool 方法
Get 从 Pool 中选择任意项目,将其从 Pool 中删除,并将其返回给调用者,调用者不应假定传递给 Put 的值与 Get 返回的值之间存在任何关系。
如果 p.Get()
返回 nil
,且 p.New()
不为零,则 p.Get()
返回 p.New()
的结果
1
2
3
4
5
6
7
8
9
10
| type Pool struct {
// New()指定一个函数生成一个值,注意该值不能和Get()同时并发修改
New func() any
}
// 从池中获取资源
pool.Get() any
// 将资源放回池
pool.Put(x any)
|
可以参考Mysql Pool各类客户端包在池化方面的使用
2.5. sync.Mutex 互斥锁 (读写均会阻塞)
1
2
3
| mu.Lock(): 锁定rw用于写入,其他G来读、写均会阻塞
mu.UnLock(): 解写锁
mu.TryLock() bool: 尝试加读写锁
|
2.6. sync.RWMutex 并发读写互斥 (支持并发读,在读多写少场景适用)
RWMutex 是读写器互斥锁,锁可以由任意数量的读取器或单个写入器持有,RWMutex的零值是未锁定的互斥锁。
注意: RWMutex 在首次使用后不得复制。
如果一个 goroutine
持有一个用于读取的 RWMutex,而另一个 goroutine 可能会调用 Lock,那么在初始读取锁被释放之前,任何 goroutine 都不应该期望能够获取读取锁。特别是,这禁止递归读锁定。这是为了确保锁最终可用;阻塞的 Lock 调用会阻止新读者获取锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| G1 -> 获取 rwmu,用于读取数据
G2 -> 获取 rwmu,可以加锁Lock()
G2..GN -> 获取 rwmu,想获取RLock() 失败,直到G2调用 UnLock()释放锁
rwmu.Lock(): 锁定rw用于写入,其他G来读、写均会阻塞
rwmu.UnLock(): 解写锁
rwmu.RLock(): 锁定rw用于读取,其他G可以并发读取,但写入会阻塞
rwmu.RUnlock(): 解读锁
rwmu.TryLock() bool: 尝试加读写锁
rwmu.TryRLock() bool: 尝试加读锁
rwmu.RLocker(): 或取读锁
|
2.7. sync.WaitGroup 等待
WaitGroup 等待一组 goroutine 完成
- 主
goroutine
调用 wg.Add()
来设置要等待的 goroutine 的数量。 - 然后,每个
goroutine
运行并在完成时调用 wg.Done()
- 同时,
wg.Wait()
可以用来阻塞,直到所有的 goroutine 都完成
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
| package main
import (
"sync"
)
type httpPkg struct{}
func (httpPkg) Get(url string) {}
var http httpPkg
func main() {
var wg sync.WaitGroup
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.example.com/",
}
for _, url := range urls {
// Increment the WaitGroup counter.
wg.Add(1)
// Launch a goroutine to fetch the URL.
go func(url string) {
// Decrement the counter when the goroutine completes.
defer wg.Done()
// Fetch the URL.
http.Get(url)
}(url)
}
// Wait for all HTTP fetches to complete.
wg.Wait()
}
|
3. 标准库 sync/atomic
包
包 atomic
提供了用于实现同步算法的低级原子内存原语, 这些函数需要非常小心才能正确使用。
除了特殊的低级应用程序外,最好使用通道或sync 同步包的工具来完成同步。
第一性原则: 通过通信共享内存,不要通过共享内存进行通信。
3.1. atomic 包使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| // Add 以原子方式将 delta 添加到 *addr 并返回新值
func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
// CAS比较交换
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
// 加载返回
func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
// 存储设置
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)
// 交换
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
// atomic.Value{} 类型
type Value
func (v *Value) CompareAndSwap(old, new any) (swapped bool)
func (v *Value) Load() (val any)
func (v *Value) Store(val any)
func (v *Value) Swap(new any) (old any)
|
3.1.1. atomic 方法对应的基本含义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| // Add
*addr += delta
return *addr
// CAS
if *addr == old {
*addr = new
return true
}
return false
// Swap
old = *addr
*addr = new
return old
// Store
*addr = val
// Load
return *addr
|
3.2. aotmic 使用Case
3.2.1. Value(Config) - 一个定期Reload更新Store,多个并发Load读取配置的实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
| import (
"sync/atomic"
"time"
)
// 加载配置
func loadConfig() map[string]string {
return make(map[string]string)
}
//
func requests() chan int {
return make(chan int)
}
func main() {
// 存储系统配置
var config atomic.Value
// 初始化系统配置,加载并存储
config.Store(loadConfig())
// 一个定期Reload Config的协程
go func() {
for {
time.Sleep(10 * time.Second)
config.Store(loadConfig())
}
}()
.
// 模拟多个协程并发处理接收的请求,并发的Load()配置
for i := 0; i < 10; i++ {
go func() {
for r := range requests() {
c := config.Load()
// Handle request r using config c.
_, _ = r, c
}
}()
}
}
|
3.2.2. Value(ReadMostly) - 读多写少,采用写时复制
相比直接map操作,atomic下面的写时复制,锁冲突粒度更小,性能更高
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
| import (
"sync"
"sync/atomic"
)
func main() {
//
type Map map[string]string
// 初始化atomic.Value,并存储Map
var m atomic.Value
m.Store(make(Map))
// 写锁
var mu sync.Mutex // used only by writers
// 从存储中读取数据,无需进一步再做同步处理
read := func(key string) (val string) {
m1 := m.Load().(Map)
return m1[key]
}
// insert function can be used to update the data without further synchronization
// 写入函数,针对Map并发写,加入
insert := func(key, val string) {
// mu加锁
mu.Lock() // synchronize with other potential writers
defer mu.Unlock()
// 原子读取
m1 := m.Load().(Map) // load current value of the data structure
// 写时复制,copy到一个新的map m2内
m2 := make(Map) // create a new value
for k, v := range m1 {
m2[k] = v // copy all data from the current object to the new one
}
// 针对m2 做一些更新
m2[key] = val // do the update that we need
// 重新设置m2到 atomic内 (锁冲突的时间很短)
m.Store(m2) // atomically replace the current object with the new one
// 这个点后的新读取,都读到新的值
// At this point all new readers start working with the new version.
// The old version will be garbage collected once the existing readers
// (if any) are done with it.
}
_, _ = read, insert
}
|
4. golang.org/sync
使用
1
2
| // import
go get -u golang.org/x/sync
|
4.1. errgroup
包 errgroup
为处理公共任务的子任务的 goroutine 组提供同步、错误传播和上下文取消。
Group 是一组 goroutines,它们处理属于同一整体任务的子任务。Group 零值是有效的,对活动 goroutines
的数量没有限制,并且不会因错误而取消。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| type Group
// WithContext 返回一个新组和一个从 ctx 派生的关联上下文。
func WithContext(ctx context.Context) (*Group, context.Context)
// 在新的 goroutine 中调用给定的函数。它一直阻塞,直到可以添加新的 goroutine 而组中的活动 goroutine 数量不超过配置的限制。
func (g *Group) Go(f func() error)
// 并发度限制,负值表示没有限制
// SetLimit 将这个组中的活动 goroutine 的数量限制为最多 n,当组中的任何 goroutine 处于活动状态时,不得修改限制。
// 任何后续调用 Group.Go 方法都会阻塞,直到它可以添加一个活动的 goroutine 而不会超过配置的限制。
func (g *Group) SetLimit(n int)
// 并发度限制,TryGo不超过limit限制
// TryGo 仅当组中的活动 goroutine 数量当前低于配置的Limit n 限制时,才会在新的 goroutine 中调用给定函数。
// 返回值报告 goroutine 是否已启动。
func (g *Group) TryGo(f func() error) bool
// g.Wati等待阻塞直到来自 Go 方法的所有函数调用都返回,然后从它们返回第一个非零错误(如果有)。
func (g *Group) Wait() error
|
4.1.1. errgroup Example - JustErrors 简化 sync.WaitGroup 计数处理
JustErrors 说明了使用 errgroup.Group
代替 sync.WaitGroup
来简化 goroutine
计数和错误处理。此示例源自 https://golang.org/pkg/sync/#example_WaitGroup 上的 sync.WaitGroup 示例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| import (
"fmt"
"net/http"
"golang.org/x/sync/errgroup"
)
func main() {
// 初始化errgroup.Group
g := new(errgroup.Group)
// 并发请求
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for _, url := range urls {
// 这是因为循环的每次迭代都使用变量 `url` 的相同实例,因此每个闭包共享该单个变量。
// 当闭包go func(){} 运行时,内部的url 可能在 goroutine 启动后已被修改
// 为了帮助在这些问题和其他问题发生之前发现它们,运行 go vet (Vet 检查 Go 源代码并报告可疑结构, `go doc cmd/vet` )
url := url // 闭包问题,需要重新赋值,或者是将变量作为参数传递给闭包(推荐) https://golang.org/doc/faq#closures_and_goroutines
// 闭包函数, Launch a goroutine to fetch the URL.
g.Go(func() error {
// Fetch the URL.
resp, err := http.Get(url)
if err == nil {
resp.Body.Close()
}
// 第一个返回 非 `nil` 错误会取消组Group,其错误将由 Wait 返回 (若有错想继续执行,则需要返回nil)
return err
})
}
// 等待所有HTTP fetch返回完成,若内部有一个错误,group被cancel
if err := g.Wait(); err == nil {
fmt.Println("Successfully fetched all URLs.")
}
}
|
4.1.2. errgroup Example - Parallel 简化并行任务
Parallel 说明了使用 Group 来同步一个简单的并行任务:来自 https://talks.golang.org/2012/concurrency.slide#46 的“Google Search 2.0”功能,增加了上下文和错误处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
| package main
import (
"context"
"fmt"
"os"
"golang.org/x/sync/errgroup"
)
var (
Web = fakeSearch("web")
Image = fakeSearch("image")
Video = fakeSearch("video")
)
// 结果类型支持搜索
type Result string
type Search func(ctx context.Context, query string) (Result, error)
// fake搜索函数
func fakeSearch(kind string) Search {
// fakesearch func
return func(_ context.Context, query string) (Result, error) {
return Result(fmt.Sprintf("%s result for %q", kind, query)), nil
}
}
func main() {
// Google函数,建设内容并返回结果
Google := func(ctx context.Context, query string) ([]Result, error) {
// errgroup 带ctx 初始化
g, ctx := errgroup.WithContext(ctx)
// 初始化搜到和结果
searches := []Search{Web, Image, Video}
results := make([]Result, len(searches))
for i, search := range searches {
i, search := i, search // https://golang.org/doc/faq#closures_and_goroutines
// 利用g.Go()开启协程执行并发搜索
g.Go(func() error {
result, err := search(ctx, query) // 执行fakesearch
if err == nil {
results[i] = result
}
// 有错直接返回
return err
})
}
// g.Wait()阻塞等待
if err := g.Wait(); err != nil {
return nil, err
}
return results, nil
}
// 检索golang
results, err := Google(context.Background(), "golang")
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
// 输出Google结果
for _, result := range results {
fmt.Println(result)
}
}
|
4.1.3. errgroup Example - Pipeline
Pipeline 演示了使用 Group 来实现多阶段管道:来自 https://blog.golang.org/pipelines 的具有有限并行性的 MD5All 函数版本。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
| package main
import (
"context"
"crypto/md5"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"golang.org/x/sync/errgroup"
)
// Pipeline demonstrates the use of a Group to implement a multi-stage
// pipeline: a version of the MD5All function with bounded parallelism from
// https://blog.golang.org/pipelines.
func main() {
m, err := MD5All(context.Background(), ".")
if err != nil {
log.Fatal(err)
}
for k, sum := range m {
fmt.Printf("%s:\t%x\n", k, sum)
}
}
type result struct {
path string
sum [md5.Size]byte
}
// MD5All读取在root下的所有文件,返回所有文件的md5 sum值,如果路径walk失败,则返回一个错误
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
// ctx is canceled when g.Wait() returns. When this version of MD5All returns
// - even in case of error! - we know that all of the goroutines have finished
// and the memory they were using can be garbage-collected.
// 当g.Wait()返回时候,ctx被取消,所有groutine结束可以被GC回收
g, ctx := errgroup.WithContext(ctx)
paths := make(chan string)
// 第一个Goroutine用于遍历path下的文件,输入到paths chan内
g.Go(func() error {
// 关闭 chan
defer close(paths)
// filepath.Walk() 迭代文件路径
return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
select {
case paths <- path: // 有路径,放入paths chan
case <-ctx.Done(): // ctx完成,返回ctx.Err
return ctx.Err()
}
return nil
})
})
// Start a fixed number of goroutines to read and digest files.
// 开启另外一系列Groutine,来读取文件,并计算md5摘要 -> 并发操作
c := make(chan result)
const numDigesters = 20
for i := 0; i < numDigesters; i++ {
g.Go(func() error {
for path := range paths { // 迭代文件path信息,读取文件,计算md5摘要
data, err := ioutil.ReadFile(path)
if err != nil {
return err
}
select {
case c <- result{path, md5.Sum(data)}:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
}
// 等待摘要所有Group的Groutine处理完成,再关闭result通道
go func() {
g.Wait()
close(c)
}()
// 迭代result通道,将数据记录到map,并返回
m := make(map[string][md5.Size]byte)
for r := range c {
m[r.path] = r.sum
}
// 检查是否有任何 goroutines 失败,由于 g 正在累积错误,我们不需要独自发送它们(或检查它们)在通道上发送的结果
if err := g.Wait(); err != nil {
return nil, err
}
return m, nil
}
|
4.2. semaphore
包 semaphore 提供加权信号量实现
4.3. singlefight - singlefight.Group
包 singleflight 提供了重复函数调用抑制机制。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| // 组代表一类工作,并形成一个命名空间,其中工作单元可以通过重复抑制来执行。
type Group struct {
// contains filtered or unexported fields
}
// Do 执行并返回给定函数的结果,确保一次只针对给定键执行一次执行。
// 如果出现key重复,则重复调用者会等待原始调用者完成并接收相同的结果。
// 返回值 shared 指示是否将 v 提供给多个调用者。
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)
// DoChan 与 Do 类似,但返回一个通道,当结果就绪时将接收结果。
// 返回的通道不会关闭。
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result
// 通道内消息的结果结构体类型
type Result struct {
Val interface{}
Err error
Shared bool
}
// Forget 告诉 singleflight 忘记一个键,
// 未来对该键的 Do 调用将调用该函数,而不是等待较早的调用完成。
func (g *Group) Forget(key string)
|
4.4. syncmap
包 syncmap 提供了一个并发映射实现。
5. 参考
- golang.org/x/sync: https://pkg.go.dev/golang.org/x/sync@v0.0.0-20220601150217-0de741cfad7f
- pkg.go.dev/sync/atomi
- pkg.go.dev/sync