Go Study Day06 - 并发

2019/8/3 go学习提升

# 基本概念

# 串行、并发与并行

串行:我们都是先读小学,小学毕业后再读初中,读完初中再读高中。

并发:同一时间段内执行多个任务(你在用微信和两个女朋友聊天)。

并行:同一时刻执行多个任务(你和你朋友都在用微信和女朋友聊天)。

# 进程、线程和协程

进程(process):程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。

线程(thread):操作系统基于进程开启的轻量级进程,是操作系统调度执行的最小单位。

协程(coroutine):非操作系统提供而是由用户自行创建和控制的用户态‘线程’,比线程更轻量级。

goroutine与操作系统线程(OS线程)的区别?

​ goroutine是用户态的线程,比内核态的线程更轻量级一点,初始时只占用2KB的栈空间。可以轻松开启数十万的goroutine也不会崩内存。

# goroutine

goroutine是用户态的线程,比内核态线程更轻量级,是由Go语言的运行时(runtime)调度的。

# 启动goroutine

将要并发执行的任务包装成一个函数,调用函数的时候前面加上go关键字,就能够开启一个goroutine去执行该函数的任务。

package main

import (
	"fmt"
	//"time"
)

func hello() {
	fmt.Println("hello")
}

func main() {
	go hello()
	fmt.Println("你好")
	// time.Sleep(time.Second)
}


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

为什么会先打印你好呢?

这是因为在程序中创建 goroutine 执行函数需要一定的开销,而与此同时 main 函数所在的 goroutine 是继续执行的。

image-20230316115639836

可以通过sleep的方式让goroutine执行完,但不够优雅。

# sync.WaitGroup

我们在 main goroutine 中使用sync.WaitGroup来等待 hello goroutine 完成后再退出。

package main

import (
	"fmt"
	"sync"
)

// 声明全局等待组变量
var wg sync.WaitGroup

func hello() {
	fmt.Println("hello")
	wg.Done() // 告知当前goroutine完成
}

func main() {
	wg.Add(1) // 登记1个goroutine
	go hello()
	fmt.Println("你好")
	wg.Wait() // 阻塞等待登记的goroutine完成
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# goroutine什么结束?

goroutine 对应的函数结束了,goroutine结束了。

main函数执行完了,由main函数创建的那些goroutine都结束了。

# goroutine调度模型

  • G:表示 goroutine,每执行一次go f()就创建一个 G,包含要执行的函数和上下文信息。
  • 全局队列(Global Queue):存放等待运行的 G。
  • P:表示 goroutine 执行所需的资源,最多有 GOMAXPROCS 个。
  • P 的本地队列:同全局队列类似,存放的也是等待运行的G,存的数量有限,不超过256个。新建 G 时,G 优先加入到 P 的本地队列,如果本地队列满了会批量移动部分 G 到全局队列。
  • M:线程想运行任务就得获取 P,从 P 的本地队列获取 G,当 P 的本地队列为空时,M 也会尝试从全局队列或其他 P 的本地队列获取 G。M 运行 G,G 执行之后,M 会从 P 获取下一个 G,不断重复下去。
  • Goroutine 调度器和操作系统调度器是通过 M 结合起来的,每个 M 都代表了1个内核线程,操作系统调度器负责把内核线程分配到 CPU 的核上执行。

# GOMAXPROCS

Go1.5之后默认就是操作系统的逻辑核心数,默认跑满CPU

runtime.GOMAXPROCS(1):只占用一个核。

# channel

Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。

Go语言采用的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信

# channel类型

var 变量名称 chan 元素类型
var b chan int // 需要指定通道中元素的类型
1
2

通道必须使用make函数初始化才能使用!!!

ch1 := make(chan int) // 无缓冲
ch2 := make(chan bool, 1)  // 声明一个缓冲区大小为1的通道
1
2

# 通道的操作

<-

  1. 发送 : ch1 <- 1
  2. 接收: <- ch1
  3. 关闭:close()

# 单向通道

多用于函数传参,限制函数中通道的操作。

<- chan int // 只接收通道,只能接收不能发送
chan <- int // 只发送通道,只能发送不能接收
1
2

# demo

package main

import (
    "fmt"
)

func producer(ch chan int) {
    for i := 0; i < 10; i++ {
        ch <- i
    }
    close(ch)
}

func consumer(ch chan int, done chan bool) {
    for {
        val, ok := <- ch
        if ok {
            fmt.Println(val)
        } else {
            done <- true
            return
        }
    }
}

func main() {
    ch := make(chan int)
    done := make(chan bool)
    go producer(ch)
    go consumer(ch, done)
    <- done
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

上述代码中,我们定义了两个 goroutine:producer()consumer()producer() 生产整数数据并将其发送到一个通道 ch 中;consumer() 则从通道 ch 中接收数据并消费。

主函数中,我们启动两个 producer()consumer() 的 goroutine,然后等待 consumer() 结束信号从 done 通道中读取。当 consumer() 接收到 ch 通道被关闭的信号时,它会将一个 boolean 类型的值发送到 done 通道中表示其结束,并退出。

简单调用 go run 命令运行该程序即可看到生产者和消费者 goroutine 的交互过程和输出结果。

Q:如果是无缓冲通道,此时关闭通道,生产者还可以继续往通道发消息吗?

A:对于无缓冲通道,关闭通道会导致无法再向该通道发送数据,任何试图发送数据的操作都会导致程序 panic。因此,在通道被关闭之前,生产者可以继续往通道发送消息,但在通道被关闭后,生产者无法继续发送消息。

Q:此时我们用的是无缓冲的通道,假设是有缓冲的通道,如果通道有数据,此时关闭通道,读取通道,ok值是false吗?

A:答案是否定的。关闭一个非空的通道时,其中的数据可以继续被读取直到通道中的所有数据都被读取完毕。在数据被全部读取之前,读取通道后的ok值仍然是true。只有在通道中的数据被全部读取之后,再次读取通道时,ok值才会变为false。

# 总结

image-20230316130838239

image-20230316132407073

# select多路复用

Select 的使用方式类似于之前学到的 switch 语句,它也有一系列 case 分支和一个默认的分支。每个 case 分支会对应一个通道的通信(接收或发送)过程。select 会一直等待,直到其中的某个 case 的通信操作完成时,就会执行该 case 分支对应的语句。具体格式如下:

select {
case <-ch1:
	//...
case data := <-ch2:
	//...
case ch3 <- 10:
	//...
default:
	//默认操作
}
1
2
3
4
5
6
7
8
9
10

# 特点

  • 可处理一个或多个 channel 的发送/接收操作。
  • 如果多个 case 同时满足,select 会随机选择一个执行。如果多个case都想执行,需要在外层套一个for{},无限循环在迫使select一直执行下去,这样可使多个case都会被执行到。
  • 对于没有 case 的 select 会一直阻塞,可用于阻塞 main 函数,防止退出。
package main

import "fmt"

func main() {
	ch := make(chan int, 1)
	for i := 1; i <= 10; i++ {
		select {
		case x := <-ch:
			fmt.Println(x)
		case ch <- i:
		}
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# workPool

控制并发,当有10个任务需要处理时,可以先把任务线程创建出来,通过控制并发数量慢慢处理。

var wg sync.WaitGroupt
func main(){
	userCount :=10
	ch := make(chan bool,2)
	for i:=0;i userCount;i++ {
		wg.Add(1)
		go Read(ch,i)
	}
		wg.Wait()
}
func Read(ch chan bool,i int){
	defer wg.Done()
	ch <-true
	fmt.Printf("go func:%d,time:%d\n",i,time.Now().Unix())
	time.Sleep(time.Second)
	<-ch
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Last Updated: 2023/4/26
只爱西经
林一