1. 并发与WorkerPool简介
并发与WorkerPool是Go语言里面非常重要的概念。在讲并发与WorkerPool之前,我们先来看看什么是并发和并行。
1.1 并发与并行的区别
并发是指在单处理器上,通过时间片轮转的方式,让每个任务都能得到一些执行时间,看似同时执行。但是每个任务之间都是有时间片的切换的,所以不是真正意义上的同时执行。
并行则是说在多个处理器上,每个处理器上都执行单独的任务,真正的同时执行。
Go语言天生支持并发,提供了非常多的并发化编程的方法。Worker Pool则是组织并发任务的一种好方式。
2. Go语言的并发编程实现方式
Go语言提供了非常简洁的并发编程实现方式,其实就是在每个任务并发执行时,用goroutine来实现,goroutine是一种非常轻量级的线程,一个程序可以产生很多个goroutine,而每个goroutine都能够让程序并发执行。
下面是一个示例代码:
package main
import (
"fmt"
"time"
)
func main() {
fmt.Println("begin...")
go a()
go b()
time.Sleep(1 * time.Second)
fmt.Println("done...")
}
func a() {
for i := 0; i < 10; i++ {
fmt.Println("a", i)
time.Sleep(100 * time.Millisecond)
}
}
func b() {
for i := 0; i < 10; i++ {
fmt.Println("b", i)
time.Sleep(100 * time.Millisecond)
}
}
上面的代码中,a和b方法都分别使用了go关键字来实现goroutine的并发执行,而main方法通过sleep方法来等待a和b方法执行完成。
3. WorkerPool的概念引入
虽然通过goroutine能够实现并发执行,但是却不能够控制goroutine的数量。这就可能导致系统资源消耗过高,进而导致程序崩溃或者变得非常慢。而Worker Pool则是一种将goroutine数量控制在合理范围内的一种方式。
4. 如何实现一个简单的WorkerPool
下面是一个简单的WorkerPool的实现:
package main
import (
"fmt"
)
type Job struct {
id int
}
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quitChannel chan bool
}
type WorkerPool struct {
JobQueue chan Job
WorkerPool chan chan Job
maxWorkers int
quitChannel chan bool
}
func NewWorkerPool(maxWorkers int) *WorkerPool {
jobQueue := make(chan Job, maxWorkers)
workerPool := make(chan chan Job, maxWorkers)
quitChannel := make(chan bool)
return &WorkerPool{
JobQueue: jobQueue,
WorkerPool: workerPool,
maxWorkers: maxWorkers,
quitChannel: quitChannel,
}
}
func (wp *WorkerPool) Run() {
for i := 0; i < wp.maxWorkers; i++ {
worker := NewWorker(wp.WorkerPool)
worker.Start()
}
go wp.dispatch()
}
func (wp *WorkerPool) dispatch() {
for {
select {
case job := <-wp.JobQueue:
go func(job Job) {
jobChannel := <-wp.WorkerPool
jobChannel <- job
}(job)
case <-wp.quitChannel:
return
}
}
}
func (wp *WorkerPool) AddJob(job Job) {
wp.JobQueue <- job
}
func (wp *WorkerPool) Stop() {
close(wp.quitChannel)
}
func NewWorker(workerPool chan chan Job) *Worker {
jobChannel := make(chan Job)
quitChannel := make(chan bool)
return &Worker{
WorkerPool: workerPool,
JobChannel: jobChannel,
quitChannel: quitChannel,
}
}
func (w *Worker) Start() {
go func() {
for {
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
fmt.Println(job.id)
case <-w.quitChannel:
return
}
}
}()
}
func (w *Worker) Stop() {
close(w.quitChannel)
}
func main() {
wp:=NewWorkerPool(10)
wp.Run()
for i:=0;i<10000;i++{
job:=Job{id:i}
wp.AddJob(job)
}
wp.Stop()
}
上面代码中我们定义了Job和Worker两个struct。Job代表了任务,而Worker则代表了具体执行任务的goroutine。WorkerPool则是一个调度器,用来控制Worker的创建和任务的分发。
4.1 NewWorkerPool
NewWorkerPool是WorkerPool的构造函数。其中jobQueue是用来存储任务的channel,workerPool是用来存储worker channel的channel,maxWorkers表示最大Worker数量,quitChannel为一个标志位,用来表示任务调度已经结束。
4.2 Run
Run方法是用来启动WorkerPool的。如下代码::
func (wp *WorkerPool) Run() {
for i := 0; i < wp.maxWorkers; i++ {
worker := NewWorker(wp.WorkerPool)
worker.Start()
}
go wp.dispatch()
}
上面代码中,我们创建了maxWorkers个的Worker,并调用Start方法对每个Worker进行启动,然后调度worker的分发。
4.3 AddJob
我们可以通过AddJob方法来为WorkerPool添加任务,如下代码::
func (wp *WorkerPool) AddJob(job Job) {
wp.JobQueue <- job
}
4.4 dispatch
dispatch方法是用来进行任务分发的,如下代码:
func (wp *WorkerPool) dispatch() {
for {
select {
case job := <-wp.JobQueue:
go func(job Job) {
jobChannel := <-wp.WorkerPool
jobChannel <- job
}(job)
case <-wp.quitChannel:
return
}
}
}
上面的代码中,我们通过select来监听JobQueue和quitChannel两个channel。当JobQueue中有任务时,我们会从WorkerPool中获取一个空闲的Worker,然后将任务分发给Worker。而当quitChannel被关闭时,任务调度结束。
4.5 NewWorker
NewWorker是用来构建Worker的。
4.6 Start
Start方法则是用来启动Worker的,实际上就是一个goroutine。
4.7 Stop
Stop方法用来停止Worker,并关闭quitChannel。
总结
通过上面的实现方式,我们已经成功的构建了一个简单的WorkerPool。WorkerPool是一个很重要的概念,可以让我们更好的控制goroutine并发执行数量,避免系统过载导致崩溃或者程序变慢。