Goroutine
Go 语言最大的特色就是从语言层面支持并发(Goroutine),Goroutine 是 Go 中最基本的执行单元。事实上每一个 Go 程序至少有一个 Goroutine:main goroutine。当程序启动时,它会自动创建。Goroutine 也可看作是轻量级的线程,创建 goroutine 的开销要远远小于线程,在 Golang 中创建成千上万个 goroutine 是常有的事情。
在函数调用前加上go
关键字,这次调用就会在一个新的 goroutine 中并发执行。当被调用的函数返回了或执行结束了,这个 goroutine 也自动结束。
例子:
package main
import (
"fmt"
"time"
)
func hello() {
fmt.Println("Hello world goroutine")
}
func main() {
go hello()
time.Sleep(1 * time.Second)
fmt.Println("main function")
}
CSP 并发模型
CSP(communicating sequential processes)并发模型是在 1970 年左右提出的概念,属于比较新的概念,不同于传统的多线程通过共享内存来通信,CSP 讲究的是“以通信的方式来共享内存”。
“不要以共享内存的方式来通信,相反,要通过通信来共享内存。”
Go 的 CSP 并发模型,是通过goroutine
和channel
来实现的。
goroutine
是 Go 语言中并发的执行单位。channel
是 Go 语言中各个并发结构体(goroutine)之前的通信机制。通俗的讲,就是各个goroutine
之间通信的”管道“,有点类似于 Linux 中的管道。
channel
的使用:
// 声明了一个信道
var ch chan int
// 快速声明
a := make(chan int)
// 创建一个带缓冲的 channel
c := make(chan int, 1024)
for i:=range c {
...
}
// 通过信道发送和接收数据
data := <- a // 从channel a 读取数据
a <- data // 将数据写入到 channel a
完整例子:
package main
import (
"fmt"
"time"
)
func Producer(queue chan<- int) {
for i := 0; i < 20; i++ {
queue <- i //写入
fmt.Println("create :", i)
}
}
func Consumer(queue <-chan int) {
time.Sleep(1 * time.Second)
for i := 0; i < 10; i++ {
v := <-queue // 读出
fmt.Println("receive:", v)
}
}
func main() {
queue := make(chan int, 88)
go Producer(queue)
go Consumer(queue)
go Consumer(queue)
time.Sleep(2 * time.Second)
}
生命周期
goroutine leak
我们先看一个例子:
package main
import (
"fmt"
"runtime"
"time"
)
func main() {
// Capture starting number of goroutines.
startingGs := runtime.NumGoroutine()
leak()
// Hold the program from terminating for 1 second to see
// if any goroutines created by leak terminate.
time.Sleep(time.Second)
// Capture ending number of goroutines.
endingGs := runtime.NumGoroutine()
// Report the results.
fmt.Println("========================================")
fmt.Println("Number of goroutines before:", startingGs)
fmt.Println("Number of goroutines after :", endingGs)
fmt.Println("Number of goroutines leaked:", endingGs-startingGs)
}
// leak is a buggy function. It launches a goroutine that
// blocks receiving from a channel. Nothing will ever be
// sent on that channel and the channel is never closed so
// that goroutine will be blocked forever.
func leak() {
ch := make(chan int)
go func() {
val := <-ch
fmt.Println("We received a value:", val)
}()
}
========================================
Number of goroutines before: 1
Number of goroutines after : 2
Number of goroutines leaked: 1
我们看leak()
这段代码创建了一个chan int
,并启动一个 goroutine 来获取它的数据。但是这个 goroutine 只有 ch 关闭的时才会退出,那么 ch 什么时候关闭了,ch 可能永远都不会关闭,导致 goroutine leak。
所以,每次起一个 goroutine 都该问自己两个问题:
- 它什么时候该结束?
- 我怎么结束它?
使用 channel 控制
下面程序展示一个使用channel控制子协程的例子:
package main
import (
"fmt"
"runtime"
"time"
)
func Process(ch chan int) {
time.Sleep(time.Second)
ch <- 1 //管道中写入一个元素表示当前协程已结束
}
func main() {
// 监控启动前有多个 goroutine
startingGs := runtime.NumGoroutine()
channels := make([]chan int, 10) //创建一个10个元素的切片,元素类型为channel
for i := 0; i < 10; i++ {
channels[i] = make(chan int) //切片中放入一个channel
go Process(channels[i]) //启动协程,传一个管道用于通信
}
for i, ch := range channels { //遍历切片,等待子协程结束
v := <-ch
fmt.Println("Routine ", i, v, " quit!")
}
// 监控启动完还有多个 goroutine
endingGs := runtime.NumGoroutine()
fmt.Println("========================================")
fmt.Println("Number of goroutines before:", startingGs)
fmt.Println("Number of goroutines after :", endingGs)
fmt.Println("Number of goroutines leaked:", endingGs-startingGs)
}
程序执行完如下:
Routine 0 1 quit!
Routine 1 1 quit!
Routine 2 1 quit!
Routine 3 1 quit!
Routine 4 1 quit!
Routine 5 1 quit!
Routine 6 1 quit!
Routine 7 1 quit!
Routine 8 1 quit!
Routine 9 1 quit!
========================================
Number of goroutines before: 1
Number of goroutines after : 1
Number of goroutines leaked: 0
上面程序通过创建 N 个 channel 来管理 N 个协程,每个协程都有一个 channel 用于跟父协程通信,父协程创建完所有协程中等待所有协程结束,同时监控启动前后的 goroutine 的数量,确保程序执行没有发生 go leak。
这个例子中,父协程仅仅是等待子协程结束,其实父协程也可以向管道中写入数据通知子协程结束,这时子协程需要定期的探测管道中是否有消息出现。
使用 context 控制
context 翻译成中文是"上下文",即它可以控制一组呈树状结构的 goroutine,每个 goroutine 拥有相同的上下文。
cannel 例子
一个使用 cancel context 的例子如下所示:
package main
import (
"context"
"fmt"
"runtime"
"time"
)
func HandelRequest(ctx context.Context) {
go WriteRedis(ctx)
go WriteDatabase(ctx)
for {
select {
case <-ctx.Done():
fmt.Println("HandelRequest Done.")
return
default:
fmt.Println("HandelRequest running")
time.Sleep(2 * time.Second)
}
}
}
func WriteRedis(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("WriteRedis Done.")
return
default:
fmt.Println("WriteRedis running")
time.Sleep(2 * time.Second)
}
}
}
func WriteDatabase(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("WriteDatabase Done.")
return
default:
fmt.Println("WriteDatabase running")
time.Sleep(2 * time.Second)
}
}
}
func main() {
startingGs := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background())
go HandelRequest(ctx)
time.Sleep(5 * time.Second)
fmt.Println("It's time to stop all sub goroutines!")
cancel()
time.Sleep(5 * time.Second)
// 监控启动完还有多个 goroutine
endingGs := runtime.NumGoroutine()
fmt.Println("========================================")
fmt.Println("Number of goroutines before:", startingGs)
fmt.Println("Number of goroutines after :", endingGs)
fmt.Println("Number of goroutines leaked:", endingGs-startingGs)
}
上面代码中协程HandelRequest()
用于处理某个请求,其又会创建两个协程:WriteRedis()
、WriteDatabase()
,main 协程创建创建 context,并把 context 在各子协程间传递,main 协程在适当的时机可以 cancel 掉所有子协程。
程序输出如下所示:
HandelRequest running
WriteDatabase running
WriteRedis running
WriteRedis running
HandelRequest running
WriteDatabase running
HandelRequest running
WriteRedis running
WriteDatabase running
It's time to stop all sub goroutines!
WriteDatabase Done.
WriteRedis Done.
HandelRequest Done.
========================================
Number of goroutines before: 1
Number of goroutines after : 1
Number of goroutines leaked: 0
timeout 例子
使用WithTimeout()
获得一个context,并在其了协程中传递,如下所示:
package main
import (
"context"
"fmt"
"runtime"
"time"
)
func HandelRequest(ctx context.Context) {
go WriteRedis(ctx)
go WriteDatabase(ctx)
for {
select {
case <-ctx.Done():
fmt.Println("HandelRequest Done.")
return
default:
fmt.Println("HandelRequest running")
time.Sleep(2 * time.Second)
}
}
}
func WriteRedis(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("WriteRedis Done.")
return
default:
fmt.Println("WriteRedis running")
time.Sleep(2 * time.Second)
}
}
}
func WriteDatabase(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("WriteDatabase Done.")
return
default:
fmt.Println("WriteDatabase running")
time.Sleep(2 * time.Second)
}
}
}
func main() {
startingGs := runtime.NumGoroutine()
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
go HandelRequest(ctx)
time.Sleep(10 * time.Second)
// 监控启动完还有多个 goroutine
endingGs := runtime.NumGoroutine()
fmt.Println("========================================")
fmt.Println("Number of goroutines before:", startingGs)
fmt.Println("Number of goroutines after :", endingGs)
fmt.Println("Number of goroutines leaked:", endingGs-startingGs)
}
主协程中创建一个5秒超时的context,并将其传递给子协程,使得子协程在5秒超时到了就自动退出。
程序输出如下:
HandelRequest running
WriteRedis running
WriteDatabase running
WriteDatabase running
WriteRedis running
HandelRequest running
WriteRedis running
WriteDatabase running
HandelRequest running
WriteRedis Done.
HandelRequest Done.
WriteDatabase Done.
========================================
Number of goroutines before: 1
Number of goroutines after : 1
Number of goroutines leaked: 0
使用 WaitGroup 控制
WaitGroup,可理解为 Wait-Goroutine-Group,即等待一组 goroutine 结束。比如某个 goroutine 需要等待其他几个 goroutine 全部完成,那么使用 WaitGroup 可以轻松实现。
下面程序展示了一个 goroutine 等待另外两个 goroutine 结束的例子:
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func process(duration time.Duration, wg *sync.WaitGroup) {
defer wg.Done()
time.Sleep(duration)
fmt.Println("Goroutine finished!", duration)
}
func main() {
startingGs := runtime.NumGoroutine()
var wg sync.WaitGroup
wg.Add(2) // 设置计数器,数值即为goroutine的个数
go process(1*time.Second, &wg) // 等待1秒
go process(2*time.Second, &wg) // 等待2秒
wg.Wait() // 主 goroutine 阻塞等待计数器变为0
fmt.Printf("All Goroutine finished!")
// 监控启动完还有多个 goroutine
endingGs := runtime.NumGoroutine()
fmt.Println("========================================")
fmt.Println("Number of goroutines before:", startingGs)
fmt.Println("Number of goroutines after :", endingGs)
fmt.Println("Number of goroutines leaked:", endingGs-startingGs)
}
简单的说,上面程序中 wg 内部维护了一个计数器:
- 启动 goroutine 前将计数器通过
Add(2)
将计数器设置为待启动的 goroutine 个数 - 启动 goroutine 后,使用
Wait()
方法阻塞自己,等待计数器变为0 - 每个 goroutine 执行结束通过
Done()
方法将计数器减1
4、计数器变为0后,阻塞的goroutine被唤醒
程序输出如下:
Goroutine finished! 1s
Goroutine finished! 2s
All Goroutine finished!========================================
Number of goroutines before: 1
Number of goroutines after : 1
Number of goroutines leaked: 0
使用 errgroup 控制
在实际的项目代码中,子任务 goroutine 的执行并不总是顺风顺水,它们也许会产生 error。而 WaitGroup 并没有告诉我们在子 goroutine 发生错误时,如何将其抛给主任务 groutine。这个时候可以考虑使用 errgroup。
下面程序展示使用了 errgroup 的例子:
package main
import (
"context"
"fmt"
"runtime"
"time"
"golang.org/x/sync/errgroup"
)
func produce(dataChan chan int) error {
for i := 1; ; i++ {
if i == 10 {
return fmt.Errorf("data 10 is wrong")
}
dataChan <- i
fmt.Println(fmt.Sprintf("sending %d", i))
}
}
func consume(ctx context.Context, dataChan chan int, i int) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case number := <-dataChan:
fmt.Println(fmt.Sprintf("job-%d receiving %d", i, number))
}
}
}
func main() {
startingGs := runtime.NumGoroutine()
g, ctx := errgroup.WithContext(context.Background())
dataChan := make(chan int, 20)
defer close(dataChan)
//生产端者任务子 goroutine
g.Go(func() error {
return produce(dataChan)
})
// 拉3个消费者任务子 goroutine
for i := 0; i < 3; i++ {
i := i
g.Go(func() error {
return consume(ctx, dataChan, i)
})
}
time.Sleep(1 * time.Second)
// 主任务 goroutine 等待 pipeline 结束数据流
if err := g.Wait(); err != nil {
fmt.Println(err)
}
fmt.Println("main goroutine done!")
// 监控启动完还有多个 goroutine
endingGs := runtime.NumGoroutine()
fmt.Println("========================================")
fmt.Println("Number of goroutines before:", startingGs)
fmt.Println("Number of goroutines after :", endingGs)
fmt.Println("Number of goroutines leaked:", endingGs-startingGs)
}
我们模拟了一个数据传送管道。有四个子任务 goroutine:一个生产数据的 goroutine,三个是数据消费的 goroutine。当数据生产者存在错误数据时(数据等于10),我们停止数据的生产与消费,并将错误抛出,回到 main goroutine 的执行逻辑中。
errgroup 通过嵌入context.WithCancel
方法产生的 cancel 函数,能够在子 goroutine 发生错误时,及时通过调用 cancel 函数,将 context 的取消信号及时传播出去。
程序执行完输出如下:
sending 1
sending 2
sending 3
sending 4
sending 5
sending 6
sending 7
sending 8
sending 9
job-2 receiving 1
job-2 receiving 4
job-0 receiving 2
job-1 receiving 3
job-1 receiving 5
data 10 is wrong
main goroutine done!
========================================
Number of goroutines before: 1
Number of goroutines after : 1
Number of goroutines leaked: 0
总结
我们在使用 goroutine 时需要注意是否会发生 leak,所以要时时刻刻在每次使用 goroutine 时,要多问自己这个 goroutine 什么时候结束。
然后我们也提供了4个控制 goroutine 生命周期的例子,分别是 Channel、Context、WaitGroup和errgroup。
- Channel: 使用channel控制子协程
- Context: 使用上下文控制子协程
- WaitGroup : 使用信号量机制控制子协程
- errgroup : 基于 WaitGroup,提供 context 反向传播机制
这4个方案种方案各有优劣,Channel 优点是实现简单,清晰易懂,Context 优点是对子协程派生出来的孙子协程的控制,WaitGroup 优点是子协程个数动态可调整,errgroup 优点是子协程错误可控可反向传播。
参考
- https://www.ardanlabs.com/blog/2018/11/goroutine-leaks-the-forgotten-sender.html
- https://dave.cheney.net/2016/12/22/never-start-a-goroutine-without-knowing-how-it-will-stop
- https://books.studygolang.com/GoExpertProgramming/chapter05/
本文由 Chakhsu Lau 创作,采用 知识共享署名4.0 国际许可协议进行许可。
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名。