Go Study Day06 - 并发
# 基本概念
# 串行、并发与并行
串行:我们都是先读小学,小学毕业后再读初中,读完初中再读高中。
并发:同一时间段内执行多个任务(你在用微信和两个女朋友聊天)。
并行:同一时刻执行多个任务(你和你朋友都在用微信和女朋友聊天)。
# 进程、线程和协程
进程(process):程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。
线程(thread):操作系统基于进程开启的轻量级进程,是操作系统调度执行的最小单位。
协程(coroutine):非操作系统提供而是由用户自行创建和控制的用户态‘线程’,比线程更轻量级。
goroutine与操作系统线程(OS线程)的区别?
goroutine是用户态的线程,比内核态的线程更轻量级一点,初始时只占用2KB的栈空间。可以轻松开启数十万的goroutine也不会崩内存。
# goroutine
goroutine
是用户态的线程,比内核态线程更轻量级,是由Go语言的运行时(runtime)调度的。
# 启动goroutine
将要并发执行的任务包装成一个函数,调用函数的时候前面加上go
关键字,就能够开启一个goroutine去执行该函数的任务。
package main
import (
"fmt"
//"time"
)
func hello() {
fmt.Println("hello")
}
func main() {
go hello()
fmt.Println("你好")
// time.Sleep(time.Second)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
为什么会先打印你好
呢?
这是因为在程序中创建 goroutine 执行函数需要一定的开销,而与此同时 main 函数所在的 goroutine 是继续执行的。
可以通过sleep的方式让goroutine执行完,但不够优雅。
# sync.WaitGroup
我们在 main goroutine 中使用sync.WaitGroup
来等待 hello goroutine 完成后再退出。
package main
import (
"fmt"
"sync"
)
// 声明全局等待组变量
var wg sync.WaitGroup
func hello() {
fmt.Println("hello")
wg.Done() // 告知当前goroutine完成
}
func main() {
wg.Add(1) // 登记1个goroutine
go hello()
fmt.Println("你好")
wg.Wait() // 阻塞等待登记的goroutine完成
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# goroutine
什么结束?
goroutine 对应的函数结束了,goroutine结束了。
main
函数执行完了,由main
函数创建的那些goroutine
都结束了。
# goroutine
调度模型
- G:表示 goroutine,每执行一次
go f()
就创建一个 G,包含要执行的函数和上下文信息。 - 全局队列(Global Queue):存放等待运行的 G。
- P:表示 goroutine 执行所需的资源,最多有 GOMAXPROCS 个。
- P 的本地队列:同全局队列类似,存放的也是等待运行的G,存的数量有限,不超过256个。新建 G 时,G 优先加入到 P 的本地队列,如果本地队列满了会批量移动部分 G 到全局队列。
- M:线程想运行任务就得获取 P,从 P 的本地队列获取 G,当 P 的本地队列为空时,M 也会尝试从全局队列或其他 P 的本地队列获取 G。M 运行 G,G 执行之后,M 会从 P 获取下一个 G,不断重复下去。
- Goroutine 调度器和操作系统调度器是通过 M 结合起来的,每个 M 都代表了1个内核线程,操作系统调度器负责把内核线程分配到 CPU 的核上执行。
# GOMAXPROCS
Go1.5之后默认就是操作系统的逻辑核心数,默认跑满CPU
runtime.GOMAXPROCS(1)
:只占用一个核。
# channel
Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。
Go语言采用的并发模型是CSP(Communicating Sequential Processes)
,提倡通过通信共享内存而不是通过共享内存而实现通信。
# channel类型
var 变量名称 chan 元素类型
var b chan int // 需要指定通道中元素的类型
2
通道必须使用make函数初始化才能使用!!!
ch1 := make(chan int) // 无缓冲
ch2 := make(chan bool, 1) // 声明一个缓冲区大小为1的通道
2
# 通道的操作
<-
- 发送 :
ch1 <- 1
- 接收:
<- ch1
- 关闭:
close()
# 单向通道
多用于函数传参,限制函数中通道的操作。
<- chan int // 只接收通道,只能接收不能发送
chan <- int // 只发送通道,只能发送不能接收
2
# demo
package main
import (
"fmt"
)
func producer(ch chan int) {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
}
func consumer(ch chan int, done chan bool) {
for {
val, ok := <- ch
if ok {
fmt.Println(val)
} else {
done <- true
return
}
}
}
func main() {
ch := make(chan int)
done := make(chan bool)
go producer(ch)
go consumer(ch, done)
<- done
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
上述代码中,我们定义了两个 goroutine:producer()
和 consumer()
。producer()
生产整数数据并将其发送到一个通道 ch
中;consumer()
则从通道 ch
中接收数据并消费。
主函数中,我们启动两个 producer()
和 consumer()
的 goroutine,然后等待 consumer()
结束信号从 done
通道中读取。当 consumer()
接收到 ch
通道被关闭的信号时,它会将一个 boolean 类型的值发送到 done
通道中表示其结束,并退出。
简单调用 go run
命令运行该程序即可看到生产者和消费者 goroutine 的交互过程和输出结果。
Q:如果是无缓冲通道,此时关闭通道,生产者还可以继续往通道发消息吗?
A:对于无缓冲通道,关闭通道会导致无法再向该通道发送数据,任何试图发送数据的操作都会导致程序 panic。因此,在通道被关闭之前,生产者可以继续往通道发送消息,但在通道被关闭后,生产者无法继续发送消息。
Q:此时我们用的是无缓冲的通道,假设是有缓冲的通道,如果通道有数据,此时关闭通道,读取通道,ok值是false吗?
A:答案是否定的。关闭一个非空的通道时,其中的数据可以继续被读取直到通道中的所有数据都被读取完毕。在数据被全部读取之前,读取通道后的ok值仍然是true。只有在通道中的数据被全部读取之后,再次读取通道时,ok值才会变为false。
# 总结
# select多路复用
Select 的使用方式类似于之前学到的 switch 语句,它也有一系列 case 分支和一个默认的分支。每个 case 分支会对应一个通道的通信(接收或发送)过程。select 会一直等待,直到其中的某个 case 的通信操作完成时,就会执行该 case 分支对应的语句。具体格式如下:
select {
case <-ch1:
//...
case data := <-ch2:
//...
case ch3 <- 10:
//...
default:
//默认操作
}
2
3
4
5
6
7
8
9
10
# 特点
- 可处理一个或多个 channel 的发送/接收操作。
- 如果多个 case 同时满足,select 会随机选择一个执行。如果多个case都想执行,需要在外层套一个for{},无限循环在迫使select一直执行下去,这样可使多个case都会被执行到。
- 对于没有 case 的 select 会一直阻塞,可用于阻塞 main 函数,防止退出。
package main
import "fmt"
func main() {
ch := make(chan int, 1)
for i := 1; i <= 10; i++ {
select {
case x := <-ch:
fmt.Println(x)
case ch <- i:
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
# workPool
控制并发,当有10个任务需要处理时,可以先把任务线程创建出来,通过控制并发数量慢慢处理。
var wg sync.WaitGroupt
func main(){
userCount :=10
ch := make(chan bool,2)
for i:=0;i userCount;i++ {
wg.Add(1)
go Read(ch,i)
}
wg.Wait()
}
func Read(ch chan bool,i int){
defer wg.Done()
ch <-true
fmt.Printf("go func:%d,time:%d\n",i,time.Now().Unix())
time.Sleep(time.Second)
<-ch
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17