1. Channels简介
Channels是Go语言中比较重要的概念,它可以用来实现并发编程中的资源同步与通信,既可以作为并发锁,也可以用于在Go程序内部的不同协程之间传递数据。在Go语言中,通过channel来实现协程之间的数据传递,而channel是一种类型,作为并发编程的重要工具之一,也是Go语言很重要的一个特性。
1.1 Channels的类型定义
定义一个channel类型通常需要用到'chan'关键字,可以定义一个只写chan或只读chan,也可以定义双向的chan。定义一个只写chan需要在元素类型之前加'chan<-', 定义一个只读chan需要在元素类型之前加'<-chan',而普通的双向chan定义时不需要加任何关键字。
// 定义一个只读channel
func readChannel(c <-chan int){
for {
fmt.Println("read", <-c)
}
}
// 定义一个只写channel
func writeChannel(c chan<- int){
for i:=0; i<10; i++ {
c <- i
}
}
// 定义一个双向channel
func biChannel(c chan int){
for {
fmt.Println("bi", <-c)
}
}
1.2 Channels的使用
使用channel时,需要注意以下两点:
对于同一个channel,如果同时有多个协程在操作,它们之间会自动互斥,确保线程安全。
读写channel时会阻塞,如果没有数据或者读写方的处理速度不够快,都可能会导致阻塞。
2. 并发锁的实现
2.1 使用Channel实现互斥锁
在并发编程中,互斥锁是最基础的同步机制,可以保证在同一时间内只有一个协程对共享资源进行读写操作,因此很容易实现。在 Go 中,我们也可以通过 channel 来实现互斥锁,只需要定义一个 buffered channel 作为锁,然后通过 在临界区域之前向通道中发送值和在临界区域之后从通道中接收值的方式来实现互斥锁,从而保证同时只能有一个协程访问临界区域:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
wg.Add(5)
var mutex = make(chan struct{}, 1)
for i := 0; i < 5; i++ {
go func(id int) {
defer wg.Done()
fmt.Printf("[%d] is running\n", id)
wait := time.Duration(rand.Intn(3000))
time.Sleep(wait * time.Millisecond)
// 申请互斥锁
mutex <- struct{}{}
fmt.Printf("[%d] is using resource\n", id)
time.Sleep(1 * time.Second)
// 释放互斥锁
<-mutex
fmt.Printf("[%d] is done\n", id)
}(i)
}
wg.Wait()
}
上述代码中,make(chan struct{}, 1) 表示创建长度为1的buffered channel,当有协程申请互斥锁时,向 channel 中写入值,在使用临界区资源的时候协程互斥,保证同一时间只有一个协程在使用临界区资源,并在使用结束后释放锁,通过从 channel 中读出值来实现。
2.2 使用Channel实现条件变量锁
除了互斥锁之外,另外一个常用的锁是条件变量锁。条件变量锁可以用来实现协作模型,如生产者/消费者模式,在消费者没有发起请求的情况下,生产者不生产数据。这种模式常用于不同协程之间的通信。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
wg.Add(2)
var x int
var mutex sync.Mutex
cond := sync.NewCond(&mutex)
// 生产者
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
time.Sleep(time.Duration(1) * time.Second)
fmt.Println("[write] before get lock")
mutex.Lock()
// 等待资源被消费
for x != 0 {
cond.Wait()
}
x = i
fmt.Printf("[write] %d\n", x)
// 生产结束,唤醒消费者进行消费
cond.Signal()
mutex.Unlock()
}
}()
// 消费者
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
time.Sleep(time.Duration(2) * time.Second)
fmt.Println("[read] before get lock")
mutex.Lock()
// 等待资源可读
for x == 0 {
cond.Wait()
}
fmt.Printf("[read] %d\n", x)
x = 0
// 消费结束,唤醒生产者进行生产
cond.Signal()
mutex.Unlock()
}
}()
wg.Wait()
}
在上述代码中,sync.NewCond(&mutex) 表示创建条件变量锁。当消费者要读取数据时,先必须获取到互斥锁,同时根据条件变量判断数据是否可以被读取。如果还没有数据可以用来读取,则达成条件后,将协程进行阻塞,等待生产者生产出可用数据,生产者生产出数据之后,通过 signal 唤醒消费者协程进行读取。
3. 资源同步的实现
3.1 阻塞式与非阻塞式资源同步
在不同协程之间实现资源同步,即协程之间的消息传递,通常有阻塞式和非阻塞式两种方式。在阻塞式方式中,协程会一直等待信号达成,而在非阻塞式方式中,协程在信号未达成的情况下,会继续执行其它任务,不会一直等待。
3.2 使用Channel实现阻塞式资源同步
在 Go 语言中,可以通过 channel 来实现阻塞式资源同步。Channel中的数据被用来传递信号,而协程在通过channel收取数据时,如没有数据可以他去,就会被阻塞在该通道上,等到有能拿到的数据时再继续执行。因此,使用channel实现的阻塞式资源同步,就可以保证信号及时到达,从而协调协程的执行。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
wg.Add(2)
// 创建有缓存的channel
ch := make(chan int, 1)
// 假设两个协程之间需要同时执行
// 获取signal和打印内容
go func() {
defer wg.Done()
for i := 0; i <= 5; i++ {
fmt.Println("[process 1] before recv data")
num := <-ch // 如果channel里放的是字符串,用str:= <-ch来取内容
fmt.Println("[process 1] recv data:", num)
fmt.Println("[process 1] before process data")
time.Sleep(500 * time.Millisecond)
fmt.Println("[process 1] processed data")
}
}()
go func() {
defer wg.Done()
for i := 0; i <= 5; i++ {
fmt.Println("[process 2] before send data")
ch <- i // 如果channel里放的是字符串,用ch <- "text"该方法写入
fmt.Println("[process 2] send data:", i)
fmt.Println("[process 2] before process data")
time.Sleep(500 * time.Millisecond)
fmt.Println("[process 2] processed data")
}
}()
wg.Wait()
}
在上述代码中,创造了一个带有缓存的channel,可以通过 <- ch 和 ch <- 的方式进行数据的读取和写入,而在读取和写入的过程中,如果Channel中没有数据或者没有位置能够写入数据,协程会被阻塞,直到对应的条件满足。
4. 总结
通过 channel 实现并发锁和实现资源同步的方式,都能够帮助我们更好地处理协程之间的关系,实现更高效的并发编程。当我们需要控制并发协程的数量,限制同时访问某个共享资源的线程数量,或者是通过单独的协程处理消息队列时,channel都是一个非常实用的工具。为了充分发挥 Go 语言的并发特性,大家可以多尝试使用 channel 进行协程的互斥、通信和控制等。