【Practical-Go-Lessons】Go并发-基础

从本文中将学到

  • 什么是并发以及并行。
  • 什么是数据竞争和死锁,如何去避免。
  • goroutine,channel,select,waitGroup,mutex。
  • 如何使用这些工具编写并发程序。

涵盖的技术概念

  • Mutex
  • Channel
  • Concurrency
  • Parallelism
  • Wait group
  • Deadlock
  • Data Race
  • Select statement

定义

进程

进程是当前由计算机执行的程序的实例。 如果您想查看计算机上运行的进程列表,您可以打开终端并运行以下命令(在 UNIX 系统上):

ps aux

您将看到计算机上正在运行的进程的完整列表。该命令将输出以下信息:

  • USER :启动进程的用户。
  • PID:进程的ID(计算机上的每个进程都有一个唯一的ID,因此可以很容易地识别一个进程)。
  • %CPU : 将显示进程的CPU使用百分比。
  • %MEM :已使用内存的百分比。
  • COMMAND :进程的命令,包括参数。

【Practical-Go-Lessons】Go并发-基础ps 的输出示例这里:

  • 我们有两个流程
  • 它们都是使用相同的命令 ./programFoo
  • 启动的它们有两个不同的进程 ID:65865 和 65689。
  • 两者均于今天 11:37 和 11:38 推出

Windows 上,您可以输入:

tasklist

执行线程

线程代表一组指令的顺序执行。单个进程可以创建多个线程。 对于每个进程,我们至少有一个执行线程。 通过创建多个线程,我们创建了多个可以共享一些数据的执行流

什么是并发

“并发是指程序、算法或问题的不同部分或单元能够在无序或部分顺序的情况下执行,而这并不会影响最终结果的能力。”

  • 并发性是指一种能力。
  • Go 支持并发。
  • 我们可以编写一个可以同时运行一组任务的 Go 程序。

什么是并行

先前的并发概念可能与并行性混淆。“并发不是并行” , 并行是指同时(同时)执行的任务。并发程序可能并行运行任务。

并发陷阱

数据竞争

定义

如果两个(或更多)执行线程访问共享变量,则可能会发生数据竞争:

  • 当一个(或多个)线程想要更新变量时;
  • 线程中没有“阻止同时访问的显式机制”。

举例

想象一下,您开发了一个并发处理请求的电子商务网站。 您同时有两个客户端对同一种产品感兴趣:电脑屏幕。产品的当前库存存储在数据库中。【Practical-Go-Lessons】Go并发-基础让我们一步步来看我们的例子:

  • 在时间 0,John 和 Jeanne 加载相同的产品页面。
  • 在时间 1 John 按下订单按钮
  • 当John单击订单按钮(购买商品)时,我们的脚本将检查数据库中的库存。
  • 那时我们有 1 个库存(参见最后一列),我们在数据库中启动更新过程,这将需要两个时间单位
  • 在时间 3 和 4,产品正在更新。因此,在时间 5 开始时,数据库中的产品库存为 0…
  • 而Jeanne也按下了订单按钮。库存的更新也在时间4启动。

这种情况下,卖给Jeanne的东西实际上为空。这是一场数据竞赛。发生这种情况是因为我们有两个线程(John 和 Jeanne)想要以写入方式访问同一个共享变量,并且我们没有机制来阻止同时访问。…

数据竞争检测器

当您构建和测试程序时,您可以使用 Go 数据竞争检测。它将检测潜在的错误:

go build -race -o myProgramName main.go

死锁

定义

当程序的两个组件互相等待时,就会发生死锁。在这种情况下,整个系统都会被阻塞。整个计划的执行无法取得任何进展。

举例:哲学家就餐问题

这个问题最初是由 Edsger W. Dijkstra 创建的。问题表述如下:

  • 我们有一个圆桌,
  • 5 位哲学家就座,
  • 每个哲学家面前都有一个意大利面盘,
  • 每个盘子之间都有一个叉子。

【Practical-Go-Lessons】Go并发-基础就餐规则如下:

  • 吃饭时,客人必须有两把叉子
  • 客人只能使用左右叉

我们需要为每个哲学家设计一个程序。解决方案可以如下:

  • 重复直到用餐完毕:
    • 当右叉可用时,拿走它,
    • 当左叉可用时,拿走它,
    • 当你拥有两把叉子时,开始吃 100 克意大利面,
    • 释放两个叉子。

让我们将程序加载到每个哲学家的大脑中并启动晚餐。每个哲学家都将成为我们主“晚餐”程序中的一个线程。 当晚餐开始时,将触发以下操作:

  • 哲学家 1 将拿走叉子 I(因为它可用)
  • 哲学家2会拿叉子II
  • 哲学家3将拿叉子III
  • 哲学家4将拿叉子
  • IV哲学家5会拿叉子V

在这种情况下,每个哲学家的程序都执行我们程序的第一条指令:

When the right fork is available take it

然后是第二条指令:

When the left fork is available take it

无法执行,因为左叉不可用。该程序被阻止。这是一个僵局!

Goroutine

Go 中创建并发系统的主要工具是 goroutine。

主协程

每个程序都有一个 goroutine,这是主 goroutine。为了演示它,让我们构建一个简单的程序并使其出现恐慌。我们将在堆栈跟踪(错误消息)中看到它后面有一个 goroutine:

// concurrency/main-goroutine/main.go 
package main

func main(){
    panic("show me the goroutine")
}

让我们构建程序(go build)并启动可执行文件。我们有以下堆栈跟踪:

panic: show me the goroutine
goroutine 1 [running]:
main.main()

说明:有一个 goroutine。主 Goroutine 的索引为 1。

官方定义

Goroutine 是一个独立于程序其余部分执行的函数。Goroutine 的基本元素是函数。每个函数都可以成为 goroutine。启动 goroutine 与启动函数一样简单,只不过您只需在函数调用之前添加“go”一词即可。

举例

定义一个函数 printNumber :

func printNumber(){     
    i :=0   
    for {       
        time.Sleep(1 * time.Second)         
        i++         
        fmt.Println(i)  
    } 
}
  • 首先将变量 i 初始化为值 0。
  • 然后它开始无限循环(使用 for 指令)。
    • 在这个无限循环中,我们使用 time.Sleep(1 * time.Second) 让程序暂停一秒钟。
    • 之后我们增加 i (i++)
    • 然后我们打印 i之后

我们声明我们的主函数(主 goroutine):

func main(){    
    fmt.Println("launch goroutine")     
    go printNumber()    
    fmt.Println("launch goroutine")     
    go printNumber()    
    time.Sleep(1 * time.Minute) 
}

该程序的执行将输出以下内容:

launch first goroutine 
launch second goroutine 
1 
1 
2 
2 
3 
3 
4 
4

Channel

Channel是带有类型的通信管道

Goroutine 可以通过Channel相互通信。Channel可以看作是两个 goroutine 之间的数据管道。该管道只能支持特定类型。【Practical-Go-Lessons】Go并发-基础

Channel类型

Channel可以是:

  • 仅发送
  • 仅接收
  • 双向(可以发送或接收)

语法

  1. 可用于发送 T 类型值的通道表示为:chan<- T
  2. 可用于接收 T 类型值的通道表示为:<-chan T
  3. 可用于发送和接收 T 类型值的通道表示为:chan T

记忆点,看<- 符号指向即可。

【Practical-Go-Lessons】Go并发-基础Channel类型的零值为 nil。

Channel初始化,容量

channel是用 make 内置函数初始化的: 为了初始化 int 的双向无缓冲通道,您可以使用以下代码:

ch1 := make(chan int)

为了初始化字符串的双向缓冲通道,您可以使用以下代码:

ch2 := make(chan string3)

3是channel的容量。这是 Go 分配的空间,用于存储发送到通道的值。

无缓冲Channel

如果您在创建Channel时未指定其容量,则该Channel是无缓冲的。大小为零的Channel也是无缓冲的。 要创建无缓冲Channel,您可以使用以下代码源:

ch3 := make(chan float)

有缓冲Channel

缓冲Channel是您在创建通道时指定缓冲区大小的通道。

ch6 := make(chan float, 16)
【Practical-Go-Lessons】Go并发-基础
image.png

发送数据

假设我们有一个名为 ch5 的Channel。要将元素发送到此Channel,我们将使用箭头语法:<-。 这两个字符传达了数据从右向左流动的概念。

package main

func main() {
    ch5 := make(chan int2)
    ch5 <- 42
}

在前面的片段中:

  • 我们初始化一个双向缓冲整数Channel(ch5),
  • 然后我们将数字 42 发送到通道,语法如下:Channel <- 表达式。

发送语句的特殊性

发送语句有一些特定的规则:

  1. 在通信之前评估Channel和表达式
  2. 如果频道开放,您可以在频道上发送。如果您在封闭的频道上发送,您的程序将会出现恐慌!
  3. 如果您在零通道上发送,它会永远阻止您的程序。

关闭Channel

可以使用内置的 close 来关闭Channel。关闭通道表示“Channel上不会再发送任何值”。

  1. 无法关闭仅接收Channel
  2. 无法在已关闭的Channel上发送数据。
  3. 无法关闭已经关闭的Channel。
  4. 可以在已关闭Channel上接收数据(请参阅下一节)
package main

import "log"

func main() {
    var received int
    ch5 := make(chan int2)
    ch5 <- 42
    close(ch5)
}

这里我们将值 42 发送到Channel中,然后使用 close 内置函数关闭它:close(ch5)。

接收数据

package main

import "log"

func main() {
    var received int
    ch5 := make(chan int2)
    ch5 <- 42
    ch5 <- 41
    received = <-ch5
    log.Println(received)
}

在此代码片段中:

  • 我们创建一个整数缓冲通道:ch5;
  • 我们将值 42 和 41 发送到通道中;
  • 我们将收到的值设置为<-ch5;
  • 我们打印收到的值,前面的脚本将输出:42。

多值接收操作

使用此语法是为了确保我们的通道没有关闭或为空:

x, ok = <-ch 
if !ok {
    log.Println("channel is empty or closed")
}

在这个例子中,我们有变量 x 来保存通道中发送的值,但我们添加了一个 ok 变量来帮助我们确定通道是空的还是关闭的。ok 的值是一个布尔值,等于

  • true :一切正常,我们收到了一个值。
  • false :如果通道为空或关闭。
package main

import "fmt"

func main() {
    ch5 := make(chan int2)
    ch5 <- 42
    close(ch5)
    received, ok := <-ch5
    fmt.Println(received, ok)
}

/*
output: 42 , true
*/

语法总结

【Practical-Go-Lessons】Go并发-基础
image.png

轻松记忆

  • 当箭头指向通道时,我们将数据发送到通道中;
  • 否则我们从通道接收数据。

如何造成死锁

带有缓冲通道

// concurrency/deadlock/main.go
package main

import (
    "log"
)

func main() {
    ch := make(chan int1)
    go dummy(ch)
    log.Println("waiting for reception...")
    ch <- 45
    ch <- 58
    ch <- 100
}

func dummy(c chan int) {
    smth := <-c
    log.Println("has received something", smth)
}

此代码将导致解锁。这是执行结果:

2021/02/16 11:19:57 waiting for reception...
2021/02/16 11:19:57 has received something 45
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        /Users/maximilienandile/Documents/DEV/goBook/concurrency/sendBlocking/main.go:15 +0xea

Process finished with exit code 2
  • 我们创建了一个容量等于 1 的缓冲通道
  • 该通道被传递给一个新的 goroutine,它将接收该通道上的数据(dummy函数)。
  • 我们在通道上发送三个值:45、58 和 100。
  • 第一个值由虚拟 goroutine 接收。
    • 缓冲区上有空间可以存储。
  • 第二个值当我们在通道上发送第三个值时,主 Goroutine 将被阻塞它会被阻塞,
    • 直到第三个值被复制到通道缓冲区。
  • 该程序将无限期地等待。
【Practical-Go-Lessons】Go并发-基础
image.png

使用无缓冲通道

// concurrency/deadlock-unbuffered/main.go
package main

func main() {
    ch := make(chan int)
    ch <- 5
}

这个简单的程序会导致死锁。主 goroutine 无限期地等待,直到接收者收到发送的数据。

Select

Select 语句用于选择“将进行一组可能的发送或接收操作中的哪一个”。

  • select 语句类似于 switch case 语句,但用于通信操作。
  • 在 select 语句中,有case和可选的Default case。
  • 将选择第一个非阻塞情况。
  • 如果 2 个或更多情况没有阻塞,则通过“均匀伪随机”选择来选择单个情况。(随机选!不是顺序!)
  • 如果所有情况都阻塞,则选择默认情况。
【Practical-Go-Lessons】Go并发-基础
image.png

没有Default的情况

package main

import "log"

func main() {

    ch1 := make(chan string1)
    ch2 := make(chan string1)
    ch1 <- "test"

    select {
    case rec, ok := <-ch1:
        if ok {
            log.Printf("received on ch1 : %s", rec)
        }
    case rec, ok := <-ch2:
        if ok {
            log.Printf("received on ch2 : %s", rec)
        }
    }
    log.Println("end")
}
  • 我们有一个包含 2 个接收操作的 select 语句。
  • 我们正在等待 ch1 和 ch2 的接收。
  • 选择第一个非阻塞情况 => 这里是第一个情况 case rec, ok := <-ch1。

ch1 是第一个接收消息的通道。这是该程序的输出:

2021/02/16 17:05:27 received on ch1 : test
2021/02/16 17:05:27 end

Default

// concurrency/select-with-default/main.go
package main

import "log"

func main() {

    ch1 := make(chan string1)
    ch2 := make(chan string1)
    select {
    case rec, ok := <-ch1:
        if ok {
            log.Printf("received on ch1 : %s", rec)
        }
    case rec, ok := <-ch2:
        if ok {
            log.Printf("received on ch2 : %s", rec)
        }
    default:
        log.Println("default case")
    }
    log.Println("end")
}
  • 选择默认情况;
  • 两个通道上尚未发送;
  • 任何内容select语句的两种情况都是阻塞的,通信操作无法进行。

空选择语句

package main

func main() {
    select {}
}

这个程序会造成死锁。

通道和选择的一些常见用例

我将在本节中详细介绍一些我觉得有趣的通道和选择的常见用例。

HTTP 服务器的正常关闭

如果程序实现了“优雅关闭”机制,则它能够以干净且有序的方式停止。 该机制必须检测程序是否已被中断。os/signal 包有一个名为 Notify 的专用函数,可以检测操作系统信号,例如“中断”。 检测到这样的信号后,程序需要实现特定的关闭逻辑(保存一些状态,删除一些临时对象,关闭一些连接……)在本节中,我们将以 HTTP 服务器为例:

// concurrency/server-graceful-shutdown/main.go
package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"
)

func main() {
    // 创建通知信号
    bye := make(chan os.Signal)
    signal.Notify(bye, os.Interrupt, syscall.SIGTERM)

    mux := http.NewServeMux()
    mux.Handle("/status", http.HandlerFunc(
        func(w http.ResponseWriter, r *http.Request) {
            fmt.Fprintf(w, "OK")
        },
    ))
    srv := &http.Server{
        Addr:    ":8081",
        Handler: mux,
    }
    // 在另一个协程中启动服务
    go func() {
        // 加载服务
        err := srv.ListenAndServe()
        if err != nil && err != http.ErrServerClosed {
            log.Fatalf("server: %qn", err)
        }
    }()
    // 等待关闭信号
    sig := <-bye
    // the code below is executed when we receive an os.Signal
    log.Printf("detected os signal %s n", sig)
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    err := srv.Shutdown(ctx)
    cancel()
    if err != nil {
        log.Fatal(err)
    }
}
  • 当程序接收到以下信息时,signal.Notify 将使用通道 bye 通知我们的程序。
  • 中断信号 os.Interrupt
    • SIGTERM 信号 :os.SIGTERM
  • 创建了一个具有一条路由“/status”的经典服务器。
  • 服务器启动到一个新的 goroutine。
  • 在主 goroutine 上,我们等待潜在的操作系统信号 ;
  • sig := <-bye收到信号后,我们执行以下代码:
log.Printf("detected os signal %s n", sig)
err := srv.Shutdown(context.Background())
if err != nil {
    log.Fatal(err)
}
  1. 我们调用 srv.Shutdown 它将正常关闭服务器。
  2. 它将“首先关闭所有打开的侦听器,然后关闭所有空闲连接,然后无限期地等待连接返回空闲状态,然后关闭”。

超时

超时是事件发生之前允许的时间量。当您的程序依赖于可能不可用的外部资源时,实现超时可能会很有趣。如果没有超时,您的程序可能会无限期地等待资源。通过 select 语句,我们可以添加超时情况:

// concurrency/timeout/main.go
package main

import (
    "log"
    "time"
)

func main() {
    ch := make(chan int1)
    select {
    case rec, ok := <-ch:
        if ok {
            log.Printf("received %d", rec)
        }
    case rec, ok := <-time.After(time.Second * 3):
        if ok {
            log.Printf("operation timed out at %s", rec)
        }
    }
}
  • time.After 返回 time.Time 元素的仅接收通道
  • 3秒后发生超时。

WaitGroup

WaitGroup是标准库提供的同步工具。它可用于等待一组 goroutine 完成任务。

// concurrency/wait-group-without/main.go
package main

import (
    "fmt"
    "time"
)

func main() {
    fmt.Printf("Program start n")
    for i := 0; i < 10; i++ {
        go concurrentTaks(i)
    }
    finishTask()
    fmt.Printf("Program end n")

}

func finishTask() {
    fmt.Println("Executing finish task")
}

func concurrentTaks(taskNumber int) {
    fmt.Printf("BEGIN Execute task number %d n", taskNumber)
    time.Sleep(100 * time.Millisecond)
    fmt.Printf("END Execute task number %d n", taskNumber)
}
  • 我们将启动 10 个 goroutine,它们将调用函数并发任务(i),其中 i 是循环计数器的索引。
  • 启动这些 goroutine 后,我们将调用函数 finishTask

前面的程序会输出:

Program start
Executing finish task
Program end

看来我们甚至没有启动我们的 goroutine!为什么 ?那是因为启动 goroutine 不会阻塞主 goroutine ……我们可以修改源代码来添加WaitGroup

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    fmt.Printf("Program start n")
    // initialize the wait group
    var waitGroup sync.WaitGroup
    waitGroup.Add(10)
    for i := 0; i < 10; i++ {
        go concurrentTaks(i, &waitGroup)
    }
    waitGroup.Wait()
    finishTask()
    fmt.Printf("Program end n")

}

func finishTask() {
    fmt.Println("Executing finish task")
}

func concurrentTaks(taskNumber int, waitGroup *sync.WaitGroup) {
    fmt.Printf("BEGIN Execute task number %d n", taskNumber)
    time.Sleep(100 * time.Millisecond)
    fmt.Printf("END Execute task number %d n", taskNumber)
    waitGroup.Done()
}

此代码片段将输出以下内容:

Program start
BEGIN Execute task number 3
BEGIN Execute task number 9
BEGIN Execute task number 7
...
END Execute task number 9
END Execute task number 7
END Execute task number 3
Executing finish task
Program end

我们得到了预期的结果。我们的 goroutine 已经执行了。让我们看看发生了什么变化。

  • 在程序的开头,我们创建了一个类型为sync.WaitGroup的变量.
  • 我们向等待组指示他需要等待 10 个工作单元完成。
  • WaitGroup组的内部计数器将递增。
  • 然后我们将等待组的指针传递给我们的 goroutine。
go concurrentTaks(i, &waitGroup)

函数并发任务也被修改(它的签名,也是它的主体):

func concurrentTaks(taskNumber int, waitGroup *sync.WaitGroup) {
    fmt.Printf("BEGIN Execute task number %d n", taskNumber)
    time.Sleep(100 * time.Millisecond)
    fmt.Printf("END Execute task number %d n", taskNumber)
    waitGroup.Done()
}
  • 我们在函数末尾调用 waitGroup.Done() 方法。
  • Done 方法将减少内部计数器。

当内部计数器达到 0(意味着所有 goroutine 都已完成)时,主 goroutine 被释放;

for i := 0; i < 10; i++ {
    //...
}
waitGroup.Wait() // we block the main goroutine (to wait for the previous goroutines to finish)
finishTask()
fmt.Printf("Program end n")

你需要记住什么

  • 创建一个WaitGroup:
var waitGroup sync.WaitGroup
  • 增加WaitGroup计数器
waitGroup.Add(1)
  • 减少 wait group计数器(goroutine完成):
waitGroup.Done()
  • 阻塞当前的 goroutine 直到所有 goroutine 都完成:
waitGroup.Wait()
  • 必须将waitGroup的指针传递给您的 goroutine。

应用:WaitGroup和Channel

任务

电子商务网站将每次访问保存在数据库中。对于每次访问,我们都有:

  • 唯一 ID (UUID v4): 例如:5ce7fef2-c53a-4b79-bcbb-669c9799fa86;
  • 访问的页面(字符串)例如:/product-6;
  • 访问日期;
  • 会话哈希例如:8722b6f78a69aeac3736bfcaa1dd7e4e7a77834dec2adf27e007ed1c998b34df,它用于按会话对页面视图进行分组。

营销团队想要列出最受欢迎的产品及其相关性能。 他们希望获得每个日期每个页面的访问次数:

  • 日期 : 2020 年 1 月 25 日 :
    • /product-45 : 254 次访问
    • /product-22 : 2345 次访问
    • /home :2000 次访问。……
  • 日期 : 2020 年 1 月 30 日 :
    • /product-45 : 125 次访问
    • /home :2000 次访问。

…查询访问数据库后,您提取了一个具有以下格式的 JSON 文件:【Practical-Go-Lessons】Go并发-基础您需要构建一个具有以下输入/输出的程序:

  • INPUT :与前面的代码片段类似的 JSON 文件。
    • 您可以在此处下载示例 JSON 文件。
  • 输出:以 JSON 格式显示的每天统计数据。输出示例:
【Practical-Go-Lessons】Go并发-基础
image.png

建议

  • 需要将 INPUT 文件的内容加载到内存中。
  • 还需要解组加载的 JSON 数据。
  • 考虑如何将工作分成多个独立的任务。
  • 创建一个处理单个任务的工作函数。
  • worker可以通过通道与您的主 Goroutine 进行通信。

解决方案

加载输入以及解析Json

// concurrency/application1/visit/visit.go
package visit

type Visit struct {
    ID          string `json:"id"`
    Page        string `json:"page"`
    SessionHash string `json:"sessionHash"`
}

该结构类型被放入新的包访问中。

// concurrency/application1/main.go

data, err := ioutil.ReadFile("data.json")
if err != nil {
    log.Fatal(err)
}
dayStats := make(map[string][]visit.Visit)
err = json.Unmarshal(data, &dayStats)
if err != nil {
    log.Fatal(err)
}

如何分工

我们可以按日期划分工作。每个每日访问量都可以独立处理。我们创建一个任务结构:

type Task struct {
    Date   string
    Visits []visit.Visit
}

我们将通过输入通道为每个worker提供一个 Task 类型的变量。

输出结构

我们程序的输出是每日统计数据的一部分。我们创建一个类型结构 DailyStat 来保存特定日期的统计数据:

type DailyStat struct {
    Date   string         `json:"date"`
    ByPage map[string]int `json:"byPage"`
}

最终输出的类型为 []DailyStat(前一个结构的切片)。

创建waitGroup/Channel

var w8 sync.WaitGroup
w8.Add(len(dayStats))

inputCh := make(chan Task, 10)
outputCh := make(chan DailyStat, len(dayStats))
  • 我们初始化一个等待组:w8
  • 初始化后,我们将值 numberOfWorkers 添加到其内部计数器中。
  • 通过这样做,我们准备了等待每个Worker完成的程序。
  • 我们创建两个双向缓冲通道。

inputCh 将用于将任务发送给工作人员。

  • 该通道的缓冲区为 10
    • 通过将缓冲区大小设置为 10,我们将控制“吞吐量”(处理任务的速率)
    • 主goroutine最多可以向通道发送10个任务
    • 当发送 10 个任务时,主 Goroutine 将阻塞,直到可以添加新任务。

outputCh 将用于将worker的结果传达给主 goroutine。【Practical-Go-Lessons】Go并发-基础

Worker

// concurrency/application1/main.go 

func worker(in chan Task, workerId int, out chan DailyStat, w8 *sync.WaitGroup) {
    for received := range in {
        m := make(map[string]int)
        for _, v := range received.Visits {
            m[v.Page]++
        }
        out <- DailyStat{
            Date:   received.Date,
            ByPage: m,
        }
        fmt.Printf("[worker %d] finished task n", workerId)
    }
    // when the channel is closed the for loop is exited
    log.Println("worker quit")
    w8.Done()
}

worker是一个函数。它将输入通道和输出通道作为输入。

  • 通过 for 循环,我们可以遍历从输入通道接收到的消息。
  • 每条消息都是一个 Task 类型的变量.
  • 通过map,我们将计算每个页面的访问次数。
    • key:页面名称(例如:/product-8).
    • value:访问次数(例如:458)
  • 一旦统计了访问次数,我们就会将结果发送到输出通道。
  • 当关闭信号发送到输入通道时,
  • for循环将退出最后waitGroup计数器减一:w8.Done()。
  • 我们向主 Goroutine 发出信号,表明一个 Goroutine 已完成其任务。

创建 Worker 并将任务发送到输入通道

// create the workers
for k := 0; k < numberOfWorkers; k++ {
    go worker(inputCh, k, outputCh, &w8)
}
// send the tasks
for date, visits := range dayStats {
    inputCh <- Task{
        Date:   date,
        Visits: visits,
    }
}
// we say that we will not send any new data on the input channel
close(inputCh)
// wait for all tasks to be completed
w8.Wait()
  • 用 for 循环创建worker.
  • 迭代 dayStats 将任务发送到输入通道。
  • 当所有任务发送完毕后,我们关闭输入通道。

w8.Wait() 将等待所有工作人员完成。

收集结构

// when all treatment is finished we close the output channel
close(outputCh)
// collect the result
done := make([]DailyStat, 0len(dayStats))
for out := range outputCh {
    done = append(done, out)
}

res, err := json.Marshal(done)
if err != nil {
    log.Fatal(err)
}
err = ioutil.WriteFile("results.json", res, 0644)
if err != nil {
    log.Fatal(err)
}
log.Println("done !")
  • 最后一部分很简单
  • 我们关闭输出通道
  • 然后我们从输出通道收集结果(使用 for 循环)
  • 每个结果都附加到完成的切片中.

然后done 会被编组为 JSON,并将结果保存到 result.json 文件中。

Mutex

同步工具

互斥锁是一种同步工具。互斥量是互斥的缩写。让我们绕道逻辑理论来更好地理解这个概念。如果两个事件不能同时发生,我们就说它们是互斥的。例如,“自行车向左转弯”和“自行车向右转弯”这两个事件是互斥的。骑自行车的人不能同时左转和右转。 这种互斥性质对我们来说很有趣。当我们构建并发程序时,可能会发生数据竞争。如果两个(或更多)执行线程访问共享变量,则可能会发生数据竞争并且当一个(或多个)线程想要更新变量时并且线程 [@savage1997eraser] 中没有“阻止同时访问的显式机制”。我们可以通过使用互斥体来避免数据竞争。我将向您展示如何:

举例

// concurrency/mutex-bug/main.go
package main

import (
    "fmt"
    "log"
    "net/http"
)

var requestCount int

func main() {
    http.HandleFunc("/status", status)
    err := http.ListenAndServe(":8090"nil)
    if err != nil {
        log.Fatal(err)
    }
}

func status(w http.ResponseWriter, req *http.Request) {
    requestCount++
    fmt.Fprintf(w, "OK - count : %d n", requestCount)
}
  • 我们在这里创建了一个简单的 HTTP Web 服务器。
  • 它只有一个路由:“/status”
  • 我们创建了一个int类型的全局变量requestCount
  • 每次将请求发送到路由“/status”时,该变量都会递增。

让我们创建另一个程序来调用该服务器……

// concurrency/mutex-server-call/main.go
package main

import (
    "io/ioutil"
    "log"
    "net/http"
    "sync"
)

func main() {
    var w8 sync.WaitGroup
    w8.Add(10)
    for k := 0; k < 10; k++ {
        go caller(&w8)
    }
    w8.Wait()
}

func caller(w8 *sync.WaitGroup) {
    for k := 0; k < 100; k++ {
        res, err := http.Get("http://localhost:8090/status")
        if err != nil {
            log.Fatal(err)
        }
        defer res.Body.Close()
        s, err := ioutil.ReadAll(res.Body)
        if err != nil {
            log.Fatal(err)
        }
        log.Println(string(s))
    }
    w8.Done()
}

该程序将启动 10 个 goroutine,每个 goroutine 会同时向服务器发送 100 个请求(10 * 100 = 1,000 个请求)。请注意,我们使用等待组来等待所有 goroutine 完成。 该程序将打印每个请求收到的正文。这是最后 3 行:

//...
2021/02/17 18:43:49 OK - count : 989 
2021/02/17 18:43:49 OK - count : 990 
2021/02/17 18:43:49 OK - count : 991

哇 !我们提出了 1,000 个请求,但我们的柜台显示 991 …

添加Mutex

// concurrency/mutex-added/main.go
package main

import (
    "fmt"
    "log"
    "net/http"
    "sync"
)

var mu sync.Mutex
var requestCount int

func main() {
    http.HandleFunc("/status", status)
    err := http.ListenAndServe(":8091"nil)
    if err != nil {
        log.Fatal(err)
    }
}

func status(w http.ResponseWriter, req *http.Request) {
    mu.Lock()
    requestCount++
    mu.Unlock()
    fmt.Fprintf(w, "OK - count : %d n", requestCount)
}
  • 我创建了一个名为 mu 的新全局变量,其类型为sync.Mutex。
  • 在Stateus函数中,我们在增量前后添加了 2 行新行。
  • 在递增之前我们调用 mu.Lock()。
  • 在递增之后调用 mu.Unlock() 。

如果我们测试该程序,计数器这次将按预期工作。sync.Mutex API 互斥 API 很简单。它有 2 个导出方法: Lock() :该方法将尝试获取锁,如果该锁已被另一个 goroutine 使用,它将阻塞直到它被释放。 Unlock() :该方法将释放锁。如果互斥体之前没有被锁定,它将生成运行时错误。

Mutex Hat

有一个非常常见的习惯用法,称为 Mutex hat。在前面的示例中,我们创建了一个全局互斥锁。不建议这样做。互斥锁通常用在结构类型中。

type DB struct {
    //...

    mu           sync.Mutex // protects following fields
    freeConn     []*driverConn
    connRequests map[uint64]chan connRequest
    nextRequest  uint64 // Next key to use in connRequests.
    numOpen      int    // number of opened and pending open connections

    //...
}

这里互斥体 mu 保护 4 个字段。它被放置在这些字段的顶部(像帽子一样)。

并发程序设计

设计并发程序并不容易。这需要一些经验,但也需要一些方法论。在本节中,我们将详细介绍一些可用于编写并发程序的技巧。 我们将使用 T. Mattson 在其优秀著作 [@mattson2004patterns] 中提出的一些概念和想法。

确定任务

任务是一个操作(可以由子操作组成)。任务可以由以下部分组成:

  • 输入/输出操作
    • 读取文件
    • 打开 TCP 连接
  • “计算”操作
    • 获取整数除法的余数
    • 迭代切片以查找特定元素索引。

任务可以是单个函数、一个方法或一组函数和方法。

输入输出

在考虑一项任务时,清楚地隔离可能会有所帮助:

  • 输入:任务中使用的数据,举例 :
    • 整数切片
    • map
    • 指向变量的指针
  • 输出:这是任务返回的内容。举例 :
    • 输入数据的修改版本。
    • 计算结果。
    • 日期、字符串、持续时间。

确认依赖

依赖关系是两个或多个任务之间的链接。我们可以描述两种类型的依赖关系:

  • 顺序依赖性:任务需要在另一任务之前或之后执行。
  • 数据依赖:两个任务的执行依赖于同一条数据。

顺序依赖性示例

通常顺序依赖性很容易理解和检测。例如,让我们以电子商务网站为例。我们隔离了以下两项任务:

  • 处理订单的付款
  • 运送包裹

很明显,我们需要在发货之前处理付款。这两个任务之间存在顺序依赖性。一项任务需要在另一项任务之前完成。

数据依赖示例

让我们继续以电子商务软件为例。通常此类应用程序存储产品详细信息和每种产品的库存。您有以下任务:

  • 下订单后减少库存。
  • 读取库存以显示在网站前端

这两项任务取决于库存数据。在第一个任务(减少库存)中,我们更新它。在第二个中,我们刚刚读了它。这两个操作取决于库存变量。如果您在阅读股票的同时更新它,会发生什么?您可能会向客户显示错误的股票信息。如果您不实施防止数据竞争的机制,数据依赖性会导致数据竞争。

建模

第一步是确定:

  • 任务
  • 任务输入/输出
  • 任务之间的依赖关系

下一步是使用这些信息来构建您的程序。

  1. 如果你的任务之间没有依赖性,那么你就处于理想状态。您的所有任务都可以同时执行。
  2. 如果您注意到订单依赖性,那么您可以建立一个工作链。
    • 假设您有 3 组任务
    • 我们可以创建 3 个worker
    • worker将同时执行,但您必须确保任务将按照给定的顺序执行。
    • 可以通过连接 3 个worker的 3 个通道来实现。
  3. 当您发现数据依赖性时,您的程序可能会受到数据竞争的影响。
    • 添加互斥锁通常可以保护您免受此陷阱的影响。

当然这3种情况也可能同时出现。因此,编写并发程序可能会很困难。然而,映射任务和依赖关系肯定有助于设计过程。


原文始发于微信公众号(小唐云原生):【Practical-Go-Lessons】Go并发-基础

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/247560.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!