Golang 中利用 Channels 处理异步任务的返回值

1. Channels 概述

Golang 是一门支持并发的语言,它提供了一些基本的并发原语来协调 Go 程序中的多个 goroutine 的执行。其中,channels 就是 Go 语言中非常重要的并发原语之一。

Channels 可以看作是 goroutine 之间的通信机制。它们提供了一种在 goroutine 之间传递数据的机制,这种机制是线程安全的,并且能够隐式地同步 goroutine 的执行。

2. 为什么需要 Channel

在一个并发程序中,不同的 goroutine 往往需要协调完成某个任务。例如,一个 goroutine 取回数据,另一个 goroutine 进行处理,第三个 goroutine 将结果返回给最终的调用者。而这些 goroutine 之间的通信和协调需要依靠某种机制。

一般而言,可以通过共享内存来实现 goroutine 之间的通信。但是,这种方式需要进行复杂的锁保护和同步,容易引入死锁和竞态条件,而且代码的可读性和维护性也会变得非常差。

而 Channels 使用起来简单方便,并且能够保证 goroutine 之间的通信和同步。使用 Channels,可以将并发程序的复杂度降到最低,提高代码的可读性和可维护性。

3. Channels 的基本用法

Channel 是通过使用 make 函数创建的。如下所示:

ch := make(chan int) // 创建一个 int 类型的 Channel

上面的代码创建了一个可以传递 int 类型数据的 Channel。

当将一个值发送到 Channel 时(即使用 ch<-value 来发送数据),这个方法会阻塞,直到有其他 goroutine 从 Channel 上读取该值。如果没有其他 goroutine 读取该值,那么这个 goroutine 就被阻塞住了。

当从一个 Channel 中读取一个值时(即通过 value <- ch 来读取数据),这个方法会一直等待,直到有其他 goroutine 发送一个值到该 Channel 上为止。如果没有其他 goroutine 发送数据,那么这个 goroutine 也会被阻塞住。

下面是一个简单的 Channel 使用示例:

package main

import "fmt"

func main() {

ch := make(chan int)

go func() {

ch <- 1 // 往 Channel 中发送 1

}()

fmt.Println(<-ch) // 从 Channel 中读取值并输出

}

上面的代码创建了一个可以传递 int 类型数据的 Channel,然后启动了一个 goroutine,在这个 goroutine 中向 Channel 中发送了一个值1。在主 goroutine 中读取了这个值并输出。

运行上面的代码可以得到输出:

1

上面的代码使用了 \<- 运算符,该运算符用于在 Channel 和 goroutine 之间传递数据。但是,当需要将 Channel 作为一个参数传递给函数时,这个运算符并不方便使用。因此,Go 语言为 Channels 提供了专门的类型表示形式。

3.1. Channel 类型表示形式

Channel 类型可以写成 chan T 的形式,其中 T 表示 Channel 中传递数据的类型。例如,可以声明一个类型为 chan int 的 Channel,如下所示:

ch := make(chan int)

也可以定义一个类型为 chan string 的 Channel,如下所示:

ch := make(chan string)

3.2. Channel 的缓冲

当使用 make 创建一个 Channel 时,可以指定 Channel 的缓冲区大小。在不进行缓冲处理的情况下,Channel 只能传递一个值,同时这个操作是同步的。如果想要进行异步传递值,就需要使用缓冲。

可以通过在 make 中传递第二个参数指定缓冲区大小,如下所示:

ch := make(chan int, 10) // 创建一个缓存大小为 10 的 int 类型的 Channel

上面的代码指定了 Channel 缓冲大小为 10,这意味着可以向 Channel 中发送 10 个值而不会阻塞任何 goroutine。只有在 Channel 缓冲区满的时候,才会阻塞 goroutine。

当缓冲区已经满了的时候,如果尝试向 Channel 中发送数据,那么发送的 goroutine 就会被阻塞。只有当 Channel 缓冲区中有空位了,才能继续发送数据。

下面是一个使用缓冲 Channel 的示例:

package main

import "fmt"

func main() {

ch := make(chan int, 2)

ch <- 1

ch <- 2

fmt.Println(<-ch)

fmt.Println(<-ch)

}

上面的代码创建了一个类型为 int 的缓冲 Channel,缓冲大小为 2。然后连续向这个 Channel 中发送了两个值。最后从 Channel 中取出两个值并输出。

运行上面的代码可以得到输出:

1

2

3.3. Channel 的关闭

当不再需要一个 Channel 时,可以通过调用 close 函数来关闭它。关闭 Channel 后,就不能向 Channel 中发送数据了,但是仍然可以从中读取数据(直到 Channel 中的所有值都被读取完为止)。调用 close 函数后,Channel 中还有的数据可以继续被读取,但是 Channel 中没有数据了,后续的所有读操作都将返回一个零值和一个空的状态(即 ok 为 false)。

下面是一个关闭 Channel 的示例:

package main

import "fmt"

func main() {

ch := make(chan int, 2)

ch <- 1

ch <- 2

close(ch)

for {

v, ok := <-ch

if !ok {

break // Channel 已经关闭,退出循环

}

fmt.Println(v)

}

}

上面的代码创建了一个类型为 int 的缓冲 Channel,缓冲大小为 2。然后连续向这个 Channel 中发送了两个值,并调用 close 函数关闭了这个 Channel。最后使用 for 循环读取 Channel 中的所有值并输出。

运行上面的代码可以得到输出:

1

2

4. Channels 处理异步任务的返回值

在 Go 语言中,异步任务通常会使用 goroutine 来实现。但是,当需要获取异步任务的返回值时,就需要引入 Channel 了。

可以使用 Channel 来传递一个结果值,让另外一个 goroutine 来处理这个结果值。下面是一个使用 Channel 传递异步任务返回值的示例:

package main

import (

"fmt"

"math/rand"

)

func main() {

taskChan := make(chan int)

resultChan := make(chan int)

go func() {

for {

task := <-taskChan // 从 taskChan 中读取一个任务

// 处理任务(这里使用随机数模拟任务处理结果)

result := rand.Intn(100)

// 把处理结果写到 resultChan 中

resultChan <- result

}

}()

for i := 0; i < 10; i++ {

// 把任务写到 taskChan 中

taskChan <- i

// 从 resultChan 中读取处理结果

result := <-resultChan

fmt.Println(result)

}

close(taskChan) // 关闭 taskChan

close(resultChan) // 关闭 resultChan

}

上面的代码创建了两个 Channel,一个用于传递任务,一个用于传递处理结果。然后启动了一个 goroutine,该 goroutine 会一直从 taskChan 中读取任务,处理任务并将处理结果写到 resultChan 中。在主 goroutine 中,通过向 taskChan 中写入任务,然后读取 resultChan 中的处理结果来获取异步任务的返回值。

运行上面的代码可以得到输出:

9

12

74

91

9

7

25

16

88

73

上面的代码使用了无限循环来从 Channel 中读取任务和处理结果。但是,当任务处理完成后,它无法通知主 goroutine,因此主 goroutine 通过显式的写入标志告知异步任务已经完成,从而结束 goroutine 的循环。

为了使主 goroutine 能够获知异步任务已经完成,可以考虑在处理结果中添加一个标识符,表示这个结果是异步任务的结果,并且标识符和值使用一个结构体来进行封装。下面是一个使用结构体封装 Channel 传递异步任务返回值的示例:

package main

import (

"fmt"

"math/rand"

)

type AsyncResult struct {

IsAsync bool

Value int

}

func main() {

taskChan := make(chan int)

resultChan := make(chan AsyncResult)

go func() {

for {

task := <-taskChan // 从 taskChan 中读取一个任务

// 处理任务(这里使用随机数模拟任务处理结果)

result := rand.Intn(100)

// 把处理结果封装到 AsyncResult 中,写到 resultChan 中

resultChan <- AsyncResult{IsAsync: true, Value: result}

}

}()

for i := 0; i < 10; i++ {

// 把任务写到 taskChan 中

taskChan <- i

// 从 resultChan 中读取处理结果

result := <-resultChan

if result.IsAsync {

fmt.Println(result.Value)

} else {

break // 如果不是异步结果,说明已经结束了,退出循环

}

}

close(taskChan) // 关闭 taskChan

close(resultChan) // 关闭 resultChan

}

上面的代码在处理结果中添加了一个 IsAsync 标识符,表示这个结果是异步任务的结果。然后在主 goroutine 中从 resultChan 中读取结果,并判断这个结果是否是异步任务的结果,进而决定是否继续循环。

运行上面的代码可以得到输出:

81

37

0

51

38

45

92

35

23

3

在异步任务处理完成后,它会将处理结果封装到 AsyncResult 结构体中,并将这个结构体写入到 Channel 中。而主 goroutine 通过 IsAsync 标识符是否为 true 来判断这个结果是否是异步任务的结果。

4.1. 处理多个异步任务的返回值

在某些场景下,可能需要同时启动多个异步任务,并且等待它们全部完成后才能进行下一步处理。此时,可以将多个结果值保存到一个 Channel 中,然后通过在主 goroutine 中循环读取 Channel 中的值来等待所有异步任务的完成。

下面是一个处理多个异步任务返回值的示例:

package main

import (

"fmt"

"math/rand"

"sync"

)

func asyncTask(taskChan chan int, resultChan chan AsyncResult, wg *sync.WaitGroup) {

defer wg.Done() // 当前任务处理完成后调用 wg.Done 来通知 WaitGroup 这个任务已经完成

for {

task, ok := <-taskChan // 从 taskChan 中读取一个任务

if !ok {

return // 如果 Channel 已经关闭了,就退出任务处理函数

}

// 处理任务(这里使用随机数模拟任务处理结果)

result := rand.Intn(100)

// 把处理结果封装到 AsyncResult 中,写到 resultChan 中

resultChan <- AsyncResult{IsAsync: true, Value: result}

}

}

func main() {

taskChan := make(chan int)

resultChan := make(chan AsyncResult)

var wg sync.WaitGroup

for i := 0; i < 10; i++ {

wg.Add(1) // 每启动一个任务都会调用 wg.Add 来增加 WaitGroup 计数器

go asyncTask(taskChan, resultChan, &wg)

}

go func() {

wg.Wait() // 当所有的异步任务都完成后,调用 wg.Wait 来等待它们全部返回

close(resultChan) // 当任务全部完成后,关闭 resultChan

}()

for i := 0; i < 10; i++ {

// 把任务写到 taskChan 中

taskChan <- i

}

for result := range resultChan {

if result.IsAsync {

fmt.Println(result.Value)

} else {

break // 如果不是异步结果,说明已经结束了,退出循环

}

}

close(taskChan) // 关闭 taskChan

}

上面的代码定义了一个 asyncTask 函数来处理任务,该函数使用 WaitGroup 来同步异步任务的处理。在主 goroutine 中,首先创建了一个 WaitGroup,并且同时启动了 10 个异步任务来处理任务。然后使用 for 循环向 taskChan 中写入任务。之后,启动另一个 goroutine 来等待所有异步任务的完成。这个 goroutine 使用 WaitGroup 来同步处理任务的完成,并在所有异步任务完成后关闭 resultChan。最后在主 goroutine 中循环读取 resultChan 中的异步处理结果。

运行上面的代码可以得到输出:

4

87

4

96

61

50

66

73

60

61

上面的代码使用了 WaitGroup 来同步多个异步任务的处理。WaitGroup 是一个计数器,它的值初始化为 0,然后每启动一个任务都会调用 wg.Add 来增加计数器,任务处理完成后通过调用 wg.Done 来减少计数器。主 goroutine 通过调用 wg.Wait 来等待所有任务处理完成。当

后端开发标签