从本文中将学到
-
什么是并发以及并行。 -
什么是数据竞争和死锁,如何去避免。 -
goroutine,channel,select,waitGroup,mutex。 -
如何使用这些工具编写并发程序。
涵盖的技术概念
-
Mutex -
Channel -
Concurrency -
Parallelism -
Wait group -
Deadlock -
Data Race -
Select statement
定义
进程
进程是当前由计算机执行的程序的实例。 如果您想查看计算机上运行的进程列表,您可以打开终端并运行以下命令(在 UNIX 系统上):
ps aux
您将看到计算机上正在运行的进程的完整列表。该命令将输出以下信息:
-
USER :启动进程的用户。 -
PID:进程的ID(计算机上的每个进程都有一个唯一的ID,因此可以很容易地识别一个进程)。 -
%CPU : 将显示进程的CPU使用百分比。 -
%MEM :已使用内存的百分比。 -
COMMAND :进程的命令,包括参数。
ps 的输出示例这里:
-
我们有两个流程 -
它们都是使用相同的命令 ./programFoo -
启动的它们有两个不同的进程 ID:65865 和 65689。 -
两者均于今天 11:37 和 11:38 推出
在 Windows 上,您可以输入:
tasklist
执行线程
线程代表一组指令的顺序执行。单个进程可以创建多个线程。 对于每个进程,我们至少有一个执行线程。 通过创建多个线程,我们创建了多个可以共享一些数据的执行流。
什么是并发
“并发是指程序、算法或问题的不同部分或单元能够在无序或部分顺序的情况下执行,而这并不会影响最终结果的能力。”
-
并发性是指一种能力。 -
Go 支持并发。 -
我们可以编写一个可以同时运行一组任务的 Go 程序。
什么是并行
先前的并发概念可能与并行性混淆。“并发不是并行” , 并行是指同时(同时)执行的任务。并发程序可能并行运行任务。
并发陷阱
数据竞争
定义
如果两个(或更多)执行线程访问共享变量,则可能会发生数据竞争:
-
当一个(或多个)线程想要更新变量时; -
线程中没有“阻止同时访问的显式机制”。
举例
想象一下,您开发了一个并发处理请求的电子商务网站。 您同时有两个客户端对同一种产品感兴趣:电脑屏幕。产品的当前库存存储在数据库中。让我们一步步来看我们的例子:
-
在时间 0,John 和 Jeanne 加载相同的产品页面。 -
在时间 1 John 按下订单按钮 -
当John单击订单按钮(购买商品)时,我们的脚本将检查数据库中的库存。 -
那时我们有 1 个库存(参见最后一列),我们在数据库中启动更新过程,这将需要两个时间单位。 -
在时间 3 和 4,产品正在更新。因此,在时间 5 开始时,数据库中的产品库存为 0… -
而Jeanne也按下了订单按钮。库存的更新也在时间4启动。
这种情况下,卖给Jeanne的东西实际上为空。这是一场数据竞赛。发生这种情况是因为我们有两个线程(John 和 Jeanne)想要以写入方式访问同一个共享变量,并且我们没有机制来阻止同时访问。…
数据竞争检测器
当您构建和测试程序时,您可以使用 Go 数据竞争检测。它将检测潜在的错误:
go build -race -o myProgramName main.go
死锁
定义
当程序的两个组件互相等待时,就会发生死锁。在这种情况下,整个系统都会被阻塞。整个计划的执行无法取得任何进展。
举例:哲学家就餐问题
这个问题最初是由 Edsger W. Dijkstra 创建的。问题表述如下:
-
我们有一个圆桌, -
5 位哲学家就座, -
每个哲学家面前都有一个意大利面盘, -
每个盘子之间都有一个叉子。
就餐规则如下:
-
吃饭时,客人必须有两把叉子 -
客人只能使用左右叉
我们需要为每个哲学家设计一个程序。解决方案可以如下:
-
重复直到用餐完毕: -
当右叉可用时,拿走它, -
当左叉可用时,拿走它, -
当你拥有两把叉子时,开始吃 100 克意大利面, -
释放两个叉子。
让我们将程序加载到每个哲学家的大脑中并启动晚餐。每个哲学家都将成为我们主“晚餐”程序中的一个线程。 当晚餐开始时,将触发以下操作:
-
哲学家 1 将拿走叉子 I(因为它可用) -
哲学家2会拿叉子II -
哲学家3将拿叉子III -
哲学家4将拿叉子 -
IV哲学家5会拿叉子V
在这种情况下,每个哲学家的程序都执行我们程序的第一条指令:
When the right fork is available take it
然后是第二条指令:
When the left fork is available take it
无法执行,因为左叉不可用。该程序被阻止。这是一个僵局!
Goroutine
Go 中创建并发系统的主要工具是 goroutine。
主协程
每个程序都有一个 goroutine,这是主 goroutine。为了演示它,让我们构建一个简单的程序并使其出现恐慌。我们将在堆栈跟踪(错误消息)中看到它后面有一个 goroutine:
// concurrency/main-goroutine/main.go
package main
func main(){
panic("show me the goroutine")
}
让我们构建程序(go build)并启动可执行文件。我们有以下堆栈跟踪:
panic: show me the goroutine
goroutine 1 [running]:
main.main()
说明:有一个 goroutine。主 Goroutine 的索引为 1。
官方定义
Goroutine 是一个独立于程序其余部分执行的函数。Goroutine 的基本元素是函数。每个函数都可以成为 goroutine。启动 goroutine 与启动函数一样简单,只不过您只需在函数调用之前添加“go”一词即可。
举例
定义一个函数 printNumber :
func printNumber(){
i :=0
for {
time.Sleep(1 * time.Second)
i++
fmt.Println(i)
}
}
-
首先将变量 i 初始化为值 0。 -
然后它开始无限循环(使用 for 指令)。 -
在这个无限循环中,我们使用 time.Sleep(1 * time.Second) 让程序暂停一秒钟。 -
之后我们增加 i (i++) -
然后我们打印 i之后
我们声明我们的主函数(主 goroutine):
func main(){
fmt.Println("launch goroutine")
go printNumber()
fmt.Println("launch goroutine")
go printNumber()
time.Sleep(1 * time.Minute)
}
该程序的执行将输出以下内容:
launch first goroutine
launch second goroutine
1
1
2
2
3
3
4
4
Channel
Channel是带有类型的通信管道
Goroutine 可以通过Channel相互通信。Channel可以看作是两个 goroutine 之间的数据管道。该管道只能支持特定类型。
Channel类型
Channel可以是:
-
仅发送 -
仅接收 -
双向(可以发送或接收)
语法
-
可用于发送 T 类型值的通道表示为:chan<- T -
可用于接收 T 类型值的通道表示为:<-chan T -
可用于发送和接收 T 类型值的通道表示为:chan T
★
记忆点,看<- 符号指向即可。
”
Channel类型的零值为 nil。
Channel初始化,容量
channel是用 make 内置函数初始化的: 为了初始化 int 的双向无缓冲通道,您可以使用以下代码:
ch1 := make(chan int)
为了初始化字符串的双向缓冲通道,您可以使用以下代码:
ch2 := make(chan string, 3)
3是channel的容量。这是 Go 分配的空间,用于存储发送到通道的值。
无缓冲Channel
如果您在创建Channel时未指定其容量,则该Channel是无缓冲的。大小为零的Channel也是无缓冲的。 要创建无缓冲Channel,您可以使用以下代码源:
ch3 := make(chan float)
有缓冲Channel
缓冲Channel是您在创建通道时指定缓冲区大小的通道。
ch6 := make(chan float, 16)
发送数据
假设我们有一个名为 ch5 的Channel。要将元素发送到此Channel,我们将使用箭头语法:<-。 这两个字符传达了数据从右向左流动的概念。
package main
func main() {
ch5 := make(chan int, 2)
ch5 <- 42
}
在前面的片段中:
-
我们初始化一个双向缓冲整数Channel(ch5), -
然后我们将数字 42 发送到通道,语法如下:Channel <- 表达式。
发送语句的特殊性
发送语句有一些特定的规则:
-
在通信之前评估Channel和表达式 -
如果频道开放,您可以在频道上发送。如果您在封闭的频道上发送,您的程序将会出现恐慌! -
如果您在零通道上发送,它会永远阻止您的程序。
关闭Channel
可以使用内置的 close 来关闭Channel。关闭通道表示“Channel上不会再发送任何值”。
-
无法关闭仅接收Channel。 -
无法在已关闭的Channel上发送数据。 -
无法关闭已经关闭的Channel。 -
可以在已关闭Channel上接收数据(请参阅下一节)
package main
import "log"
func main() {
var received int
ch5 := make(chan int, 2)
ch5 <- 42
close(ch5)
}
这里我们将值 42 发送到Channel中,然后使用 close 内置函数关闭它:close(ch5)。
接收数据
package main
import "log"
func main() {
var received int
ch5 := make(chan int, 2)
ch5 <- 42
ch5 <- 41
received = <-ch5
log.Println(received)
}
在此代码片段中:
-
我们创建一个整数缓冲通道:ch5; -
我们将值 42 和 41 发送到通道中; -
我们将收到的值设置为<-ch5; -
我们打印收到的值,前面的脚本将输出:42。
多值接收操作
使用此语法是为了确保我们的通道没有关闭或为空:
x, ok = <-ch
if !ok {
log.Println("channel is empty or closed")
}
在这个例子中,我们有变量 x 来保存通道中发送的值,但我们添加了一个 ok 变量来帮助我们确定通道是空的还是关闭的。ok 的值是一个布尔值,等于
-
true :一切正常,我们收到了一个值。 -
false :如果通道为空或关闭。
package main
import "fmt"
func main() {
ch5 := make(chan int, 2)
ch5 <- 42
close(ch5)
received, ok := <-ch5
fmt.Println(received, ok)
}
/*
output: 42 , true
*/
语法总结
轻松记忆
-
当箭头指向通道时,我们将数据发送到通道中; -
否则我们从通道接收数据。
如何造成死锁
带有缓冲通道
// concurrency/deadlock/main.go
package main
import (
"log"
)
func main() {
ch := make(chan int, 1)
go dummy(ch)
log.Println("waiting for reception...")
ch <- 45
ch <- 58
ch <- 100
}
func dummy(c chan int) {
smth := <-c
log.Println("has received something", smth)
}
此代码将导致解锁。这是执行结果:
2021/02/16 11:19:57 waiting for reception...
2021/02/16 11:19:57 has received something 45
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
/Users/maximilienandile/Documents/DEV/goBook/concurrency/sendBlocking/main.go:15 +0xea
Process finished with exit code 2
-
我们创建了一个容量等于 1 的缓冲通道 -
该通道被传递给一个新的 goroutine,它将接收该通道上的数据(dummy函数)。 -
我们在通道上发送三个值:45、58 和 100。 -
第一个值由虚拟 goroutine 接收。 -
缓冲区上有空间可以存储。 -
第二个值当我们在通道上发送第三个值时,主 Goroutine 将被阻塞它会被阻塞, -
直到第三个值被复制到通道缓冲区。 -
该程序将无限期地等待。
使用无缓冲通道
// concurrency/deadlock-unbuffered/main.go
package main
func main() {
ch := make(chan int)
ch <- 5
}
这个简单的程序会导致死锁。主 goroutine 无限期地等待,直到接收者收到发送的数据。
Select
Select 语句用于选择“将进行一组可能的发送或接收操作中的哪一个”。
-
select 语句类似于 switch case 语句,但用于通信操作。 -
在 select 语句中,有case和可选的Default case。 -
将选择第一个非阻塞情况。 -
如果 2 个或更多情况没有阻塞,则通过“均匀伪随机”选择来选择单个情况。(随机选!不是顺序!) -
如果所有情况都阻塞,则选择默认情况。
没有Default的情况
package main
import "log"
func main() {
ch1 := make(chan string, 1)
ch2 := make(chan string, 1)
ch1 <- "test"
select {
case rec, ok := <-ch1:
if ok {
log.Printf("received on ch1 : %s", rec)
}
case rec, ok := <-ch2:
if ok {
log.Printf("received on ch2 : %s", rec)
}
}
log.Println("end")
}
-
我们有一个包含 2 个接收操作的 select 语句。 -
我们正在等待 ch1 和 ch2 的接收。 -
选择第一个非阻塞情况 => 这里是第一个情况 case rec, ok := <-ch1。
ch1 是第一个接收消息的通道。这是该程序的输出:
2021/02/16 17:05:27 received on ch1 : test
2021/02/16 17:05:27 end
Default
// concurrency/select-with-default/main.go
package main
import "log"
func main() {
ch1 := make(chan string, 1)
ch2 := make(chan string, 1)
select {
case rec, ok := <-ch1:
if ok {
log.Printf("received on ch1 : %s", rec)
}
case rec, ok := <-ch2:
if ok {
log.Printf("received on ch2 : %s", rec)
}
default:
log.Println("default case")
}
log.Println("end")
}
-
选择默认情况; -
两个通道上尚未发送; -
任何内容select语句的两种情况都是阻塞的,通信操作无法进行。
空选择语句
package main
func main() {
select {}
}
这个程序会造成死锁。
通道和选择的一些常见用例
我将在本节中详细介绍一些我觉得有趣的通道和选择的常见用例。
HTTP 服务器的正常关闭
如果程序实现了“优雅关闭”机制,则它能够以干净且有序的方式停止。 该机制必须检测程序是否已被中断。os/signal 包有一个名为 Notify 的专用函数,可以检测操作系统信号,例如“中断”。 检测到这样的信号后,程序需要实现特定的关闭逻辑(保存一些状态,删除一些临时对象,关闭一些连接……)在本节中,我们将以 HTTP 服务器为例:
// concurrency/server-graceful-shutdown/main.go
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
// 创建通知信号
bye := make(chan os.Signal)
signal.Notify(bye, os.Interrupt, syscall.SIGTERM)
mux := http.NewServeMux()
mux.Handle("/status", http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "OK")
},
))
srv := &http.Server{
Addr: ":8081",
Handler: mux,
}
// 在另一个协程中启动服务
go func() {
// 加载服务
err := srv.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
log.Fatalf("server: %qn", err)
}
}()
// 等待关闭信号
sig := <-bye
// the code below is executed when we receive an os.Signal
log.Printf("detected os signal %s n", sig)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
err := srv.Shutdown(ctx)
cancel()
if err != nil {
log.Fatal(err)
}
}
-
当程序接收到以下信息时,signal.Notify 将使用通道 bye 通知我们的程序。 -
中断信号 os.Interrupt -
SIGTERM 信号 :os.SIGTERM -
创建了一个具有一条路由“/status”的经典服务器。 -
服务器启动到一个新的 goroutine。 -
在主 goroutine 上,我们等待潜在的操作系统信号 ; -
sig := <-bye收到信号后,我们执行以下代码:
log.Printf("detected os signal %s n", sig)
err := srv.Shutdown(context.Background())
if err != nil {
log.Fatal(err)
}
-
我们调用 srv.Shutdown 它将正常关闭服务器。 -
它将“首先关闭所有打开的侦听器,然后关闭所有空闲连接,然后无限期地等待连接返回空闲状态,然后关闭”。
超时
超时是事件发生之前允许的时间量。当您的程序依赖于可能不可用的外部资源时,实现超时可能会很有趣。如果没有超时,您的程序可能会无限期地等待资源。通过 select 语句,我们可以添加超时情况:
// concurrency/timeout/main.go
package main
import (
"log"
"time"
)
func main() {
ch := make(chan int, 1)
select {
case rec, ok := <-ch:
if ok {
log.Printf("received %d", rec)
}
case rec, ok := <-time.After(time.Second * 3):
if ok {
log.Printf("operation timed out at %s", rec)
}
}
}
-
time.After 返回 time.Time 元素的仅接收通道 -
3秒后发生超时。
WaitGroup
WaitGroup是标准库提供的同步工具。它可用于等待一组 goroutine 完成任务。
// concurrency/wait-group-without/main.go
package main
import (
"fmt"
"time"
)
func main() {
fmt.Printf("Program start n")
for i := 0; i < 10; i++ {
go concurrentTaks(i)
}
finishTask()
fmt.Printf("Program end n")
}
func finishTask() {
fmt.Println("Executing finish task")
}
func concurrentTaks(taskNumber int) {
fmt.Printf("BEGIN Execute task number %d n", taskNumber)
time.Sleep(100 * time.Millisecond)
fmt.Printf("END Execute task number %d n", taskNumber)
}
-
我们将启动 10 个 goroutine,它们将调用函数并发任务(i),其中 i 是循环计数器的索引。 -
启动这些 goroutine 后,我们将调用函数 finishTask
前面的程序会输出:
Program start
Executing finish task
Program end
看来我们甚至没有启动我们的 goroutine!为什么 ?那是因为启动 goroutine 不会阻塞主 goroutine ……我们可以修改源代码来添加WaitGroup:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
fmt.Printf("Program start n")
// initialize the wait group
var waitGroup sync.WaitGroup
waitGroup.Add(10)
for i := 0; i < 10; i++ {
go concurrentTaks(i, &waitGroup)
}
waitGroup.Wait()
finishTask()
fmt.Printf("Program end n")
}
func finishTask() {
fmt.Println("Executing finish task")
}
func concurrentTaks(taskNumber int, waitGroup *sync.WaitGroup) {
fmt.Printf("BEGIN Execute task number %d n", taskNumber)
time.Sleep(100 * time.Millisecond)
fmt.Printf("END Execute task number %d n", taskNumber)
waitGroup.Done()
}
此代码片段将输出以下内容:
Program start
BEGIN Execute task number 3
BEGIN Execute task number 9
BEGIN Execute task number 7
...
END Execute task number 9
END Execute task number 7
END Execute task number 3
Executing finish task
Program end
我们得到了预期的结果。我们的 goroutine 已经执行了。让我们看看发生了什么变化。
-
在程序的开头,我们创建了一个类型为sync.WaitGroup的变量. -
我们向等待组指示他需要等待 10 个工作单元完成。 -
WaitGroup组的内部计数器将递增。 -
然后我们将等待组的指针传递给我们的 goroutine。
go concurrentTaks(i, &waitGroup)
函数并发任务也被修改(它的签名,也是它的主体):
func concurrentTaks(taskNumber int, waitGroup *sync.WaitGroup) {
fmt.Printf("BEGIN Execute task number %d n", taskNumber)
time.Sleep(100 * time.Millisecond)
fmt.Printf("END Execute task number %d n", taskNumber)
waitGroup.Done()
}
-
我们在函数末尾调用 waitGroup.Done() 方法。 -
Done 方法将减少内部计数器。
当内部计数器达到 0(意味着所有 goroutine 都已完成)时,主 goroutine 被释放;
for i := 0; i < 10; i++ {
//...
}
waitGroup.Wait() // we block the main goroutine (to wait for the previous goroutines to finish)
finishTask()
fmt.Printf("Program end n")
你需要记住什么
-
创建一个WaitGroup:
var waitGroup sync.WaitGroup
-
增加WaitGroup计数器
waitGroup.Add(1)
-
减少 wait group计数器(goroutine完成):
waitGroup.Done()
-
阻塞当前的 goroutine 直到所有 goroutine 都完成:
waitGroup.Wait()
-
必须将waitGroup的指针传递给您的 goroutine。
应用:WaitGroup和Channel
任务
电子商务网站将每次访问保存在数据库中。对于每次访问,我们都有:
-
唯一 ID (UUID v4): 例如:5ce7fef2-c53a-4b79-bcbb-669c9799fa86; -
访问的页面(字符串)例如:/product-6; -
访问日期; -
会话哈希例如:8722b6f78a69aeac3736bfcaa1dd7e4e7a77834dec2adf27e007ed1c998b34df,它用于按会话对页面视图进行分组。
营销团队想要列出最受欢迎的产品及其相关性能。 他们希望获得每个日期每个页面的访问次数:
-
日期 : 2020 年 1 月 25 日 : -
/product-45 : 254 次访问 -
/product-22 : 2345 次访问 -
/home :2000 次访问。…… -
日期 : 2020 年 1 月 30 日 : -
/product-45 : 125 次访问 -
/home :2000 次访问。
…查询访问数据库后,您提取了一个具有以下格式的 JSON 文件:您需要构建一个具有以下输入/输出的程序:
-
INPUT :与前面的代码片段类似的 JSON 文件。 -
您可以在此处下载示例 JSON 文件。 -
输出:以 JSON 格式显示的每天统计数据。输出示例:
建议
-
需要将 INPUT 文件的内容加载到内存中。 -
还需要解组加载的 JSON 数据。 -
考虑如何将工作分成多个独立的任务。 -
创建一个处理单个任务的工作函数。 -
worker可以通过通道与您的主 Goroutine 进行通信。
解决方案
加载输入以及解析Json
// concurrency/application1/visit/visit.go
package visit
type Visit struct {
ID string `json:"id"`
Page string `json:"page"`
SessionHash string `json:"sessionHash"`
}
该结构类型被放入新的包访问中。
// concurrency/application1/main.go
data, err := ioutil.ReadFile("data.json")
if err != nil {
log.Fatal(err)
}
dayStats := make(map[string][]visit.Visit)
err = json.Unmarshal(data, &dayStats)
if err != nil {
log.Fatal(err)
}
如何分工
我们可以按日期划分工作。每个每日访问量都可以独立处理。我们创建一个任务结构:
type Task struct {
Date string
Visits []visit.Visit
}
我们将通过输入通道为每个worker提供一个 Task 类型的变量。
输出结构
我们程序的输出是每日统计数据的一部分。我们创建一个类型结构 DailyStat 来保存特定日期的统计数据:
type DailyStat struct {
Date string `json:"date"`
ByPage map[string]int `json:"byPage"`
}
最终输出的类型为 []DailyStat(前一个结构的切片)。
创建waitGroup/Channel
var w8 sync.WaitGroup
w8.Add(len(dayStats))
inputCh := make(chan Task, 10)
outputCh := make(chan DailyStat, len(dayStats))
-
我们初始化一个等待组:w8 -
初始化后,我们将值 numberOfWorkers 添加到其内部计数器中。 -
通过这样做,我们准备了等待每个Worker完成的程序。 -
我们创建两个双向缓冲通道。
inputCh 将用于将任务发送给工作人员。
-
该通道的缓冲区为 10 -
通过将缓冲区大小设置为 10,我们将控制“吞吐量”(处理任务的速率) -
主goroutine最多可以向通道发送10个任务 -
当发送 10 个任务时,主 Goroutine 将阻塞,直到可以添加新任务。
outputCh 将用于将worker的结果传达给主 goroutine。
Worker
// concurrency/application1/main.go
func worker(in chan Task, workerId int, out chan DailyStat, w8 *sync.WaitGroup) {
for received := range in {
m := make(map[string]int)
for _, v := range received.Visits {
m[v.Page]++
}
out <- DailyStat{
Date: received.Date,
ByPage: m,
}
fmt.Printf("[worker %d] finished task n", workerId)
}
// when the channel is closed the for loop is exited
log.Println("worker quit")
w8.Done()
}
worker是一个函数。它将输入通道和输出通道作为输入。
-
通过 for 循环,我们可以遍历从输入通道接收到的消息。 -
每条消息都是一个 Task 类型的变量. -
通过map,我们将计算每个页面的访问次数。 -
key:页面名称(例如:/product-8). -
value:访问次数(例如:458) -
一旦统计了访问次数,我们就会将结果发送到输出通道。 -
当关闭信号发送到输入通道时, -
for循环将退出最后waitGroup计数器减一:w8.Done()。 -
我们向主 Goroutine 发出信号,表明一个 Goroutine 已完成其任务。
创建 Worker 并将任务发送到输入通道
// create the workers
for k := 0; k < numberOfWorkers; k++ {
go worker(inputCh, k, outputCh, &w8)
}
// send the tasks
for date, visits := range dayStats {
inputCh <- Task{
Date: date,
Visits: visits,
}
}
// we say that we will not send any new data on the input channel
close(inputCh)
// wait for all tasks to be completed
w8.Wait()
-
用 for 循环创建worker. -
迭代 dayStats 将任务发送到输入通道。 -
当所有任务发送完毕后,我们关闭输入通道。
w8.Wait() 将等待所有工作人员完成。
收集结构
// when all treatment is finished we close the output channel
close(outputCh)
// collect the result
done := make([]DailyStat, 0, len(dayStats))
for out := range outputCh {
done = append(done, out)
}
res, err := json.Marshal(done)
if err != nil {
log.Fatal(err)
}
err = ioutil.WriteFile("results.json", res, 0644)
if err != nil {
log.Fatal(err)
}
log.Println("done !")
-
最后一部分很简单 -
我们关闭输出通道 -
然后我们从输出通道收集结果(使用 for 循环) -
每个结果都附加到完成的切片中.
然后done 会被编组为 JSON,并将结果保存到 result.json 文件中。
Mutex
同步工具
互斥锁是一种同步工具。互斥量是互斥的缩写。让我们绕道逻辑理论来更好地理解这个概念。如果两个事件不能同时发生,我们就说它们是互斥的。例如,“自行车向左转弯”和“自行车向右转弯”这两个事件是互斥的。骑自行车的人不能同时左转和右转。 这种互斥性质对我们来说很有趣。当我们构建并发程序时,可能会发生数据竞争。如果两个(或更多)执行线程访问共享变量,则可能会发生数据竞争并且当一个(或多个)线程想要更新变量时并且线程 [@savage1997eraser] 中没有“阻止同时访问的显式机制”。我们可以通过使用互斥体来避免数据竞争。我将向您展示如何:
举例
// concurrency/mutex-bug/main.go
package main
import (
"fmt"
"log"
"net/http"
)
var requestCount int
func main() {
http.HandleFunc("/status", status)
err := http.ListenAndServe(":8090", nil)
if err != nil {
log.Fatal(err)
}
}
func status(w http.ResponseWriter, req *http.Request) {
requestCount++
fmt.Fprintf(w, "OK - count : %d n", requestCount)
}
-
我们在这里创建了一个简单的 HTTP Web 服务器。 -
它只有一个路由:“/status” -
我们创建了一个int类型的全局变量requestCount -
每次将请求发送到路由“/status”时,该变量都会递增。
让我们创建另一个程序来调用该服务器……
// concurrency/mutex-server-call/main.go
package main
import (
"io/ioutil"
"log"
"net/http"
"sync"
)
func main() {
var w8 sync.WaitGroup
w8.Add(10)
for k := 0; k < 10; k++ {
go caller(&w8)
}
w8.Wait()
}
func caller(w8 *sync.WaitGroup) {
for k := 0; k < 100; k++ {
res, err := http.Get("http://localhost:8090/status")
if err != nil {
log.Fatal(err)
}
defer res.Body.Close()
s, err := ioutil.ReadAll(res.Body)
if err != nil {
log.Fatal(err)
}
log.Println(string(s))
}
w8.Done()
}
该程序将启动 10 个 goroutine,每个 goroutine 会同时向服务器发送 100 个请求(10 * 100 = 1,000 个请求)。请注意,我们使用等待组来等待所有 goroutine 完成。 该程序将打印每个请求收到的正文。这是最后 3 行:
//...
2021/02/17 18:43:49 OK - count : 989
2021/02/17 18:43:49 OK - count : 990
2021/02/17 18:43:49 OK - count : 991
哇 !我们提出了 1,000 个请求,但我们的柜台显示 991 …
添加Mutex
// concurrency/mutex-added/main.go
package main
import (
"fmt"
"log"
"net/http"
"sync"
)
var mu sync.Mutex
var requestCount int
func main() {
http.HandleFunc("/status", status)
err := http.ListenAndServe(":8091", nil)
if err != nil {
log.Fatal(err)
}
}
func status(w http.ResponseWriter, req *http.Request) {
mu.Lock()
requestCount++
mu.Unlock()
fmt.Fprintf(w, "OK - count : %d n", requestCount)
}
-
我创建了一个名为 mu 的新全局变量,其类型为sync.Mutex。 -
在Stateus函数中,我们在增量前后添加了 2 行新行。 -
在递增之前我们调用 mu.Lock()。 -
在递增之后调用 mu.Unlock() 。
如果我们测试该程序,计数器这次将按预期工作。sync.Mutex API 互斥 API 很简单。它有 2 个导出方法: Lock() :该方法将尝试获取锁,如果该锁已被另一个 goroutine 使用,它将阻塞直到它被释放。 Unlock() :该方法将释放锁。如果互斥体之前没有被锁定,它将生成运行时错误。
Mutex Hat
有一个非常常见的习惯用法,称为 Mutex hat。在前面的示例中,我们创建了一个全局互斥锁。不建议这样做。互斥锁通常用在结构类型中。
type DB struct {
//...
mu sync.Mutex // protects following fields
freeConn []*driverConn
connRequests map[uint64]chan connRequest
nextRequest uint64 // Next key to use in connRequests.
numOpen int // number of opened and pending open connections
//...
}
这里互斥体 mu 保护 4 个字段。它被放置在这些字段的顶部(像帽子一样)。
并发程序设计
设计并发程序并不容易。这需要一些经验,但也需要一些方法论。在本节中,我们将详细介绍一些可用于编写并发程序的技巧。 我们将使用 T. Mattson 在其优秀著作 [@mattson2004patterns] 中提出的一些概念和想法。
确定任务
任务是一个操作(可以由子操作组成)。任务可以由以下部分组成:
-
输入/输出操作 -
读取文件 -
打开 TCP 连接 -
“计算”操作 -
获取整数除法的余数 -
迭代切片以查找特定元素索引。
任务可以是单个函数、一个方法或一组函数和方法。
输入输出
在考虑一项任务时,清楚地隔离可能会有所帮助:
-
输入:任务中使用的数据,举例 : -
整数切片 -
map -
指向变量的指针 -
输出:这是任务返回的内容。举例 : -
输入数据的修改版本。 -
计算结果。 -
日期、字符串、持续时间。
确认依赖
依赖关系是两个或多个任务之间的链接。我们可以描述两种类型的依赖关系:
-
顺序依赖性:任务需要在另一任务之前或之后执行。 -
数据依赖:两个任务的执行依赖于同一条数据。
顺序依赖性示例
通常顺序依赖性很容易理解和检测。例如,让我们以电子商务网站为例。我们隔离了以下两项任务:
-
处理订单的付款 -
运送包裹
很明显,我们需要在发货之前处理付款。这两个任务之间存在顺序依赖性。一项任务需要在另一项任务之前完成。
数据依赖示例
让我们继续以电子商务软件为例。通常此类应用程序存储产品详细信息和每种产品的库存。您有以下任务:
-
下订单后减少库存。 -
读取库存以显示在网站前端。
这两项任务取决于库存数据。在第一个任务(减少库存)中,我们更新它。在第二个中,我们刚刚读了它。这两个操作取决于库存变量。如果您在阅读股票的同时更新它,会发生什么?您可能会向客户显示错误的股票信息。如果您不实施防止数据竞争的机制,数据依赖性会导致数据竞争。
建模
第一步是确定:
-
任务 -
任务输入/输出 -
任务之间的依赖关系
下一步是使用这些信息来构建您的程序。
-
如果你的任务之间没有依赖性,那么你就处于理想状态。您的所有任务都可以同时执行。 -
如果您注意到订单依赖性,那么您可以建立一个工作链。 -
假设您有 3 组任务 -
我们可以创建 3 个worker -
worker将同时执行,但您必须确保任务将按照给定的顺序执行。 -
可以通过连接 3 个worker的 3 个通道来实现。 -
当您发现数据依赖性时,您的程序可能会受到数据竞争的影响。 -
添加互斥锁通常可以保护您免受此陷阱的影响。
当然这3种情况也可能同时出现。因此,编写并发程序可能会很困难。然而,映射任务和依赖关系肯定有助于设计过程。
原文始发于微信公众号(小唐云原生):【Practical-Go-Lessons】Go并发-基础
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/247560.html