Gumi-Termainl

夜风拂过,暗夜飘摇,轻奏音色,飘舞静寂.

关于如何使用groutine运行程序检测并修正竞争状态利用通道共享数据。参考至《GO语言实战》第六章内容

使用groutine运行程序**

检测并修正竞争状态

利用通道共享数据

当一个函数创建为 goroutine时,Go 会将其视为一个独立的工作单元。

  • 并发与并行

当运行一个应用程序(如一个 IDE 或者编辑器)的时候,操作系统会为这个应用程序启动一个进程。可以将这个进程看作一个包含了应用程序在运行中需要用到和维护的各种资源的容器。

一个线程是一个执行空间,这个空间会被操作系统调度来运行函数中所写的代码。

并发(concurrency)不是并行(parallelism)。并行是让不同的代码片段同时在不同的物理处理器上执行。

并行的关键是同时做很多事情,而并发是指同时管理很多事情,这些事情可能只做了一半就被暂停去做别的事情了。

  • gouroutine

一个正运行的 goroutine 在工作结束前,可以被停止并重新调度。

调度器这样做的目的是防止某个 goroutine 长时间占用逻辑处理器。当 goroutine 占用时间过长时,调度器会停止当前正运行的 goroutine,并给其他可运行的 goroutine 运行的机会。

和其他函数调用一样,创建为 goroutine 的函数调用时可以传入参数。不过 goroutine 终止时无法获取函数的返回值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"fmt"
"runtime"
"sync"
)
//这个示例程序战士goroutine调度器是如何在单个线程上切分时间片的

//wg是用来等待程序完成的
var wg sync.WaitGroup

//main
func main(){
//分配一个逻辑处理器给调度器使用
runtime.GOMAXPROCS(1)
//计数加2,表示要等待两个goroutine
wg.Add(2)
//创建两个goroutine
fmt.Println("Create Groutines")
go printPrime("A")
go printPrime("B")
//等待goroutine结束
fmt.Println("Waiting To Finish")
wg.Wait()
fmt.Println("Terminating Program")
}
func printPrime(prefix string){
// 在函数退出时调用Done来通知main函数工作已经完成
defer wg.Done()
next:
for outer:=2;outer<5000;outer++ {
for inner:=2;inner < outer;inner++{
if outer%inner==0{
continue next
}
}
fmt.Printf("%s:%d\n",prefix,outer)
}
fmt.Println("Completed",prefix)
}

记住,只有在有多个逻辑处理器且可以同时让每个goroutine 运行在一个可用的物理处理器上的时候,goroutine 才会并行运行。

1
2
runtime.GOMAXPROCS(2) //分配2个逻辑处理器给调度器使用
runtime.GOMAXPROCS(runtime.NumCPU()) //给每个可用的核心分配一个逻辑处理器

写并发程序时的潜在危险,以及需要注意的事情:

  • 竞争状态

如果两个或者多个groutine在没有互相同步的情况下,访问某个共享的资源,并试图同时读和写这个资源,就处于互相竞争的状态,这种情况被称为竞争状态

同一时刻只能有一个goroutine对共享资源进行读和写操作。

Go 语言有一个特别的工具,可以在代码里检测竞争状态。

1
$ go build -race   //用竞争检测器标志来编译程序

种修正代码、消除竞争状态的办法是,使用 Go 语言提供的锁机制,来锁住共享资源,从而保证 goroutine 的同步状态。

  • 锁住共享资源

Go 语言提供了传统的同步 goroutine 的机制,就是对共享资源加锁。如果需要顺序访问一个整型变量或者一段代码,atomic 和 sync 包里的函数提供了很好的解决方案。

  • 原子函数(atomic)
1
2
3
import "sync/atomic"
//安全的队counter加1
atomic.Addint64(&counter,1)

程序使用了 atmoic 包的 AddInt64 函数。这个函数会同步整型值的加法,方法是强制同一时刻只能有一个 goroutine 运行并完成这个加法操作。

另外LoadInt64和StoreInt64。这两个函数提供了一种安全地读和写一个整型值的方式。

  • 互斥锁(mutex)

互斥锁用于在代码上创建一个临界区,保证同一时间只有一个goroutine可以执行这个临界区代码。

1
2
3
4
5
import sync
var mutex sync.Mutex //mutex用来定义一段代码临界区

mutex.Lock() {} //同一时刻只能允许一个goroutine进入这个临界区
mutex.Unlock() //释放锁,允许其他正在等待的goroutine进入临界区

使用大括号只是为了让临界区看起来更清晰,并不是必需的。

  • 通道☆

当一个资源需要在 goroutine 之间共享时,通道在 goroutine 之间架起了一个管道,并提供了确保同步交换数据的机制。

声明通道时,需要指定将要被共享的数据的类型,可以通过通道共享内置类型、命名类型、结构类型和引用类型的值或者指针。

在Go语言中需要内置函数make来创建一个通道。

1
2
3
4
//无缓冲的整型通道
unbuffered := make(chan int)
//有缓冲的字符串通道
bufferd := make(chan string,10)

想通道发送值或者指针需要用到<-操作符

1
2
3
4
//有缓冲的字符串通道
buffered := make(chan string,10)
//通过通道发送一个字符串
buffered <- "Gopher"

为了让另一个 goroutine 可以从该通道里接收到这个字符串,我们依旧使用<-操作符,但这次是一元运算符

1
2
//从通道接收一个字符串
value := <-buffered
  • 无缓冲的通道

无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。

这种类型的通道要求发送goroutine和接收goroutine同时准备好,才能完成发送和接收的操作。

如果没有同时准备好,goroutine就会阻塞等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
//这个示例程序展示如何用无缓冲的通道来模拟2个goroutine间的网球比赛
package main

import (
"fmt"
"math/rand"
"sync"
"time"
)

var wg sync.WaitGroup

func init(){
rand.Seed(time.Now().UnixNano())
}
//main
func main(){
//创建一个无缓冲的通道
court := make(chan int)

//计数加2,表示要等待两个goroutine
wg.Add(2)

//启动两个选手
go player("Nadal",court)
go player("Djokovic",court)

//发球
court <- 1
//等待游戏结束
wg.Wait()
}
//模拟一个选手在打球
func player(name string,court chan int){
//在函数退出时调用Done来通知main函数工作已经完成
defer wg.Done()
for {
//等待球被几打过来
ball,ok := <- court
if !ok{
//如果通道被关闭,我们就硬了
fmt.Printf("Player %s Won\n",name)
return
}
//选随机数,然后用这个随机数判断是否丢球
n := rand.Intn(100)
if n%13 == 0 {
fmt.Printf("Player %s Missed\n",name)
//关闭通道,表示我们输了
close(court)
return
}
//显示击球数,并+1
fmt.Printf("Player %s Hit %d\n",name,ball)
ball++
//将球打向对手
court <- ball
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Output:
Player Djokovic Hit 1
Player Nadal Hit 2
Player Djokovic Hit 3
Player Nadal Hit 4
Player Djokovic Hit 5
Player Nadal Hit 6
Player Djokovic Hit 7
Player Nadal Hit 8
Player Djokovic Hit 9
Player Nadal Hit 10
Player Djokovic Hit 11
Player Nadal Hit 12
Player Djokovic Hit 13
Player Nadal Hit 14
Player Djokovic Hit 15
Player Nadal Hit 16
Player Djokovic Hit 17
Player Nadal Hit 18
Player Djokovic Missed
Player Nadal Won
  • 有缓冲的通道

有缓冲的通道(buffered channel)是一种在被接收前能存储一个或者多个值的通道。

通道会阻塞发送和接收动作的条件也会不同。只有在通道中没有要接收的值时,接收动作才会阻塞。只有在通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞

无缓冲的通道保证进行发送和接收的 goroutine 会在同一时间进行数据交换;有缓冲的通道没有这种保证。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
//这个示例程序展示如何使用有缓冲的通道和固定数目的goroutine来处理一堆工作
package main

import(
"fmt"
"math/rand"
"sync"
"time"
)
//常量定义
const (
numberGoroutines = 4 //要使用的goroutine的数量
taskLoad = 10 //要处理的工作的数量
)

//wg 用来等待程序完成
var wg sync.WaitGroup

//init初始化
func init(){
//初始化随机数种子
rand.Seed(time.Now().Unix())
}
//main
func main(){
//创建一个有缓存的通道来管理工作
tasks := make(chan string,taskLoad)
//启动goroutine来处理工作
wg.Add(numberGoroutines)
for gr := 1; gr <= numberGoroutines; gr++ {
go worker(tasks,gr)
}
//增加一组要完成的任务
for post := 1; post <= taskLoad; post++{
tasks <- fmt.Sprintf("Task:%d",post)
}
//当前所有工作都处理完毕关闭通道
close(tasks)
wg.Wait()
}
//WORKEER作为goroutine启动来处理从有缓冲的通道传入的工作
func worker(tasks chan string,worker int){
//通知函数已经返回
defer wg.Done()
for {
//等待分配工作
task,ok := <- tasks
if !ok {
fmt.Printf("Worker:%d:Shutting Down\n",worker)
return
}
//显示我们要开始工作了
fmt.Printf("Worker:%d:Started %s\n",worker,task)
//随机等待一段时间来模拟工作
sleep := rand.Int63n(100)
time.Sleep(time.Duration(sleep) * time.Millisecond)
//显示工作完成
fmt.Printf("Worker: %d : Completed %s \n",worker,task)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Output:
Worker:4:Started Task:4
Worker:3:Started Task:3
Worker:2:Started Task:2
Worker:1:Started Task:1
Worker: 1 : Completed Task:1
Worker:1:Started Task:5
Worker: 2 : Completed Task:2
Worker:2:Started Task:6
Worker: 4 : Completed Task:4
Worker:4:Started Task:7
Worker: 1 : Completed Task:5
Worker:1:Started Task:8
Worker: 3 : Completed Task:3
Worker:3:Started Task:9
Worker: 4 : Completed Task:7
Worker:4:Started Task:10
Worker: 3 : Completed Task:9
Worker:3:Shutting Down
Worker: 2 : Completed Task:6
Worker:2:Shutting Down
Worker: 4 : Completed Task:10
Worker:4:Shutting Down
Worker: 1 : Completed Task:8
Worker:1:Shutting Down

当通道关闭后,goroutine 依旧可以从通道接收数据,但是不能再向通道里发送数据。

有缓冲的通道和无缓冲的通道的例子很好地展示了如何编写使用通道的代码。在下一章,我们会介绍真实世界里的一些可能会在工程里用到的并发模式。

  • 小结

并发是指goroutine运行的时候是相互独立的。

使用关键词go创建goroutine来运行函数。

goroutine在逻辑处理器上执行,而逻辑处理器具有独立的系统线程和运行队列。

竞争状态是指两个或者多个goroutine试图访问同一个资源。

原子函数和互斥锁提供了一种防止出现竞争状态的办法。

通道提供了一种在两个goroutine之间共享数据的简单方法。

无缓冲的通道保证同时交换数据,而有缓冲的通道不做这种保证。

本文最后更新于 天前,文中所描述的信息可能已发生改变