Go Study Day07 - 并发安全
# 为什么会有并发安全
多个线程同时改数据,以谁为准?所以有了并发安全。
那什么类型的数据需要开发者关注并发安全呢?
# 原子类型
原子类型意味着该数据类型的读取和写入操作是原子的,即在同一时刻只能有一个goroutine访问和修改该值,保证了该操作的原子性和可见性,不会出现竞争状态,因此在多个goroutine对其进行读写时不需要额外的同步机制(如锁)来保证数据的正确性和并发安全。
原子操作类型在Go语言中的sync/atomic
标准库中,包含以下类型:
- int32
- int64
- uint32
- uint64
- uintptr
- unsafe.Pointer
这些类型提供了一些原子操作,例如加减运算、交换、比较和交换等,这些操作可以在并发执行的多个goroutine之间安全地保留变量的原子性。这使得在并发执行的情况下使用共享内存变得更加安全和高效。
系统默认来保证原子类型在做原子操作时的并发安全。
# 非原子类型
而map、切片等非原子类型则系统并不会来保证他的并发安全,因为那样会让他变的低效。
在并发环境下,对于 Go 语言的 Map 而言,
- 当多个 goroutine 并发写入或者读取 map 时,会出现竞争条件而导致运行时错误或者未定义的行为。
- 单独的读取、覆盖或者增加操作不会引起竞态条件问题,但是由于 map 的实现机制导致读写 map 会涉及到对 map 中的桶、链表等内部数据的修改,因此具有一定的复杂性。
所以,虽然增加 key value 操作不会出现竞态条件,但是在并发环境中还是需要使用 sync.Map 或者加锁的方式来保证 map 的并发安全性。
# 并发安全
# 互斥锁
适用于并发访问公共资源的场景。
声明互斥锁
var lock sync.Mutex
使用互斥锁
func add() {
for i := 0; i < 500000; i++ {
lock.Lock() // 获取互斥锁
x = x + 1
lock.Unlock() // 释放互斥锁
}
wg.Done()
}
2
3
4
5
6
7
8
使用互斥锁能够保证同一时间有且只有一个 goroutine 进入临界区,其他的 goroutine 则在等待锁;当互斥锁释放后,等待的 goroutine 才可以获取锁进入临界区,多个 goroutine 同时等待一个锁时,唤醒的策略是随机的。
# 读写互斥锁
读写锁分为两种:读锁和写锁。当一个 goroutine 获取到读锁之后,其他的 goroutine 如果是获取读锁会继续获得锁,如果是获取写锁就会等待;而当一个 goroutine 获取写锁之后,其他的 goroutine 无论是获取读锁还是写锁都会等待。
声明读写互斥锁
var rwLock sync.RWMutex
使用读写互斥锁
// 读操作
func read() {
defer wg.Done()
rwLock.RLock() // 获取读锁
fmt.Println(x)
time.Sleep(time.Millisecond * 2)
rwLock.RUnlock() // 释放读锁
}
// 写操作
func write() {
defer wg.Done()
rwLock.Lock() // 获取写锁
x = x + 1
time.Sleep(time.Millisecond * 10)
rwLock.Unlock() // 释放写锁
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
使用读写互斥锁在读多写少的场景下能够极大地提高程序的性能。不过需要注意的是如果一个程序中的读操作和写操作数量级差别不大,那么读写互斥锁的优势就发挥不出来。
# sync.WaitGroup
day06并发里面已经使用过,主要用于阻塞主进程,在协程执行完成之前不退出。
var wg sync.WaitGroup
func hello() {
defer wg.Done()
fmt.Println("Hello Goroutine!")
}
func main() {
wg.Add(1)
go hello() // 启动另外一个goroutine去执行hello函数
fmt.Println("main goroutine done!")
wg.Wait()
}
2
3
4
5
6
7
8
9
10
11
12
# sync.Once
适用于那些只执行一次的场景。
例如,只加载一次图片、只关闭一次channel
var icons map[string]image.Image
var loadIconsOnce sync.Once
func loadIcons() {
icons = map[string]image.Image{
"left": loadIcon("left.png"),
"up": loadIcon("up.png"),
"right": loadIcon("right.png"),
"down": loadIcon("down.png"),
}
}
// Icon 是并发安全的
func Icon(name string) image.Image {
loadIconsOnce.Do(loadIcons)
return icons[name]
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# sync.Map
Go内置的map不是并发安全的。
// sync.Map 是一个开箱即用的并发安全的map
var m2 = sync.Map{}
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 21; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
m2.Store(key, n) // 必须使用sync.Map内置的Store方法设置键值对
value, _ := m2.Load(key) // 必须使用sync.Map提供的Load方法根据key取值
fmt.Printf("k=:%v,v:=%v\n", key, value)
wg.Done()
}(i)
}
wg.Wait()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 原子操作
针对整数数据类型(int32、uint32、int64、uint64)我们还可以使用原子操作来保证并发安全,通常直接使用原子操作比使用锁操作效率更高。Go语言中原子操作由内置的标准库sync/atomic
提供。
# atomic
方法 | 释义 |
---|---|
func LoadInt32(addr *int32) (val int32) | 读取操作 |
func LoadUint32(addr *uint32) (val uint32) | 读取操作 |
func LoadUint64(addr *uint64) (val uint64) | 读取操作 |
func LoadUintptr(addr *uintptr) (val uintptr) | 读取操作 |
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer) | 读取操作 |
func StoreInt32(addr *int32, val int32) | 写入操作 |
func StoreInt64(addr *int64, val int64) | 写入操作 |
func StoreUint32(addr *uint32, val uint32) | 写入操作 |
func StoreUint64(addr *uint64, val uint64) | 写入操作 |
func StoreUintptr(addr *uintptr, val uintptr) | 写入操作 |
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer) | 写入操作 |
func AddInt32(addr *int32, delta int32) (new int32) | 修改操作 |
func AddInt64(addr *int64, delta int64) (new int64) | 修改操作 |
func AddUint32(addr *uint32, delta uint32) (new uint32) | 修改操作 |
func AddUint64(addr *uint64, delta uint64) (new uint64) | 修改操作 |
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr) | 修改操作 |
func SwapInt32(addr *int32, new int32) (old int32) | 交换操作 |
func SwapInt64(addr *int64, new int64) (old int64) | 交换操作 |
func SwapUint32(addr *uint32, new uint32) (old uint32) | 交换操作 |
func SwapUint64(addr *uint64, new uint64) (old uint64) | 交换操作 |
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr) | 交换操作 |
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer) | 交换操作 |
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) | 比较并交换操作 |
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool) | 比较并交换操作 |
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool) | 比较并交换操作 |
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool) | 比较并交换操作 |
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool) | 比较并交换操作 |
# 性能对比
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
type Counter interface {
Inc()
Load() int64
}
// 普通版
type CommonCounter struct {
counter int64
}
func (c CommonCounter) Inc() {
c.counter++
}
func (c CommonCounter) Load() int64 {
return c.counter
}
// 互斥锁版
type MutexCounter struct {
counter int64
lock sync.Mutex
}
func (m *MutexCounter) Inc() {
m.lock.Lock()
defer m.lock.Unlock()
m.counter++
}
func (m *MutexCounter) Load() int64 {
m.lock.Lock()
defer m.lock.Unlock()
return m.counter
}
// 原子操作版
type AtomicCounter struct {
counter int64
}
func (a *AtomicCounter) Inc() {
atomic.AddInt64(&a.counter, 1)
}
func (a *AtomicCounter) Load() int64 {
return atomic.LoadInt64(&a.counter)
}
func test(c Counter) {
var wg sync.WaitGroup
start := time.Now()
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
c.Inc()
wg.Done()
}()
}
wg.Wait()
end := time.Now()
fmt.Println(c.Load(), end.Sub(start))
}
func main() {
c1 := CommonCounter{} // 非并发安全
test(c1)
c2 := MutexCounter{} // 使用互斥锁实现并发安全
test(&c2)
c3 := AtomicCounter{} // 并发安全且比互斥锁效率更高
test(&c3)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81