Golang 中的并发模式之 Channels 和 Pipelines

1. Channels 和 Pipelines 概述

在 Golang 中,Channels 和 Pipelines 是并发编程中的核心概念。Channel 是用来协调不同线程之间通信和同步的一个机制,它可以在不同线程之间传递数据,而不需要锁和条件变量。Pipeline 是将多个任务组合在一起形成一个流水线,每个任务都可以并行运行且可以有独立的输入和输出。

通过使用 Channel 和 Pipeline,程序可以更加简单地表达并发任务之间的依赖关系,使得程序的结构更加清晰,代码更加容易读懂、维护和修改。

2. Channels

2.1 Channel 的创建和基本操作

在 Golang 中,使用 make 函数可以创建一个 Channel。Channel 是一种类型,它需要指定传输的数据类型。创建一个 int 类型的 Channel,可以使用以下代码:

ch := make(chan int)

Channel 支持基本的操作,包括发送和接收数据。发送数据使用 <- 操作符,接收数据使用 <- 操作符:

ch <- 1 // 发送数据

x := <- ch // 接收数据

发送操作会阻塞,直到有其它线程从 Channel 中读取了数据。相对地,接收操作会阻塞,直到 Channel 中有数据可以读取。

2.2 Channel 的缓冲

除了使用无缓冲 Channel,Golang 中还可以创建带缓冲的 Channel。

缓冲 Channel 的创建方式为:

ch := make(chan int, 10)

第二个参数为 Channel 的缓冲大小。在缓冲区未满之前,发送操作不会阻塞。当缓冲区满时,发送操作会阻塞。

3. Pipelines

3.1 Pipeline 的实现方法

在 Golang 中,我们可以使用 Channel 来实现 Pipeline。Pipeline 可以看做是多个 Channel 的组合,每个 Channel 负责连接相邻的任务。每个 Channel 可以有缓冲或无缓冲,使得Pipeline在不同的场景下具有更好的表达能力。以下是一个简单的 Pipeline 的例子:

func gen(nums ...int) chan int {

out := make(chan int)

go func() {

for _, n := range nums {

out <- n

}

close(out)

}()

return out

}

func sq(in chan int) chan int {

out := make(chan int)

go func() {

for n := range in {

out <- n * n

}

close(out)

}()

return out

}

func main() {

c := gen(2, 3)

out := sq(c)

fmt.Println(<-out)

fmt.Println(<-out)

}

在这个例子中,gen 函数将参数中的数字逐个发送到 Channel 中,最后关闭 Channel。sq 函数从 gen 函数返回的 Channel 中读取数字并计算平方,并将结果发送到一个新的 Channel 中。

main 函数从 sq 函数返回的 Channel 中读取最终结果并打印输出。

3.2 Pipeline 的应用场景

Pipeline 可以应用于很多场景,例如数据分析、机器学习等领域,这些场景中往往需要进行多个任务的组合和计算。

下面是一个简单的 Pipeline 的应用示例:计算质数个数。我们需要计算出指定的区间内的所有质数,并统计其个数。

func generate(ch chan int) {

for i := 2; i < max; i++ {

ch <- i

}

close(ch)

}

func filter(in, out chan int, prime int) {

for {

i, ok := <-in

if !ok {

close(out)

return

}

if i%prime != 0 {

out <- i

}

}

}

func sieve() int {

ch := make(chan int)

go generate(ch)

prime := 0

for {

n, ok := <-ch

if !ok {

return prime

}

prime = n

ch1 := make(chan int)

go filter(ch, ch1, prime)

ch = ch1

}

}

const max = 10000

func main() {

count := 0

for n := range generatePrimes() {

count++

}

fmt.Println("Primes count:", count)

}

在上述代码中,generate 函数生成了从 2 到 max 的所有数字,并依次将它们发送到 Channel 中。filter 函数从输入的 Channel 中读取数字,将不是质数的数字过滤掉,并将剩余的数字发送到输出的 Channel 中。sieve 函数使用管道连接了多个 filter 函数,它从起始的 Channel 中读取数据,选择出一个质数,将不是质数的数字传递给下一个 filter 函数。最终,生成的 Channel 包含了指定区间内的所有质数,程序通过读取该 Channel 来统计质数个数。

总的来说,Golang 中的 Channels 和 Pipelines 可以为并发编程提供强大的支持,使得编写高效而可维护的并发程序变得更加容易。

后端开发标签