Go语言的并发与WorkerPool

1. 并发与WorkerPool简介

并发与WorkerPool是Go语言里面非常重要的概念。在讲并发与WorkerPool之前,我们先来看看什么是并发和并行。

1.1 并发与并行的区别

并发是指在单处理器上,通过时间片轮转的方式,让每个任务都能得到一些执行时间,看似同时执行。但是每个任务之间都是有时间片的切换的,所以不是真正意义上的同时执行。

并行则是说在多个处理器上,每个处理器上都执行单独的任务,真正的同时执行。

Go语言天生支持并发,提供了非常多的并发化编程的方法。Worker Pool则是组织并发任务的一种好方式。

2. Go语言的并发编程实现方式

Go语言提供了非常简洁的并发编程实现方式,其实就是在每个任务并发执行时,用goroutine来实现,goroutine是一种非常轻量级的线程,一个程序可以产生很多个goroutine,而每个goroutine都能够让程序并发执行。

下面是一个示例代码:

package main

import (

"fmt"

"time"

)

func main() {

fmt.Println("begin...")

go a()

go b()

time.Sleep(1 * time.Second)

fmt.Println("done...")

}

func a() {

for i := 0; i < 10; i++ {

fmt.Println("a", i)

time.Sleep(100 * time.Millisecond)

}

}

func b() {

for i := 0; i < 10; i++ {

fmt.Println("b", i)

time.Sleep(100 * time.Millisecond)

}

}

上面的代码中,a和b方法都分别使用了go关键字来实现goroutine的并发执行,而main方法通过sleep方法来等待a和b方法执行完成。

3. WorkerPool的概念引入

虽然通过goroutine能够实现并发执行,但是却不能够控制goroutine的数量。这就可能导致系统资源消耗过高,进而导致程序崩溃或者变得非常慢。而Worker Pool则是一种将goroutine数量控制在合理范围内的一种方式。

4. 如何实现一个简单的WorkerPool

下面是一个简单的WorkerPool的实现:

package main

import (

"fmt"

)

type Job struct {

id int

}

type Worker struct {

WorkerPool chan chan Job

JobChannel chan Job

quitChannel chan bool

}

type WorkerPool struct {

JobQueue chan Job

WorkerPool chan chan Job

maxWorkers int

quitChannel chan bool

}

func NewWorkerPool(maxWorkers int) *WorkerPool {

jobQueue := make(chan Job, maxWorkers)

workerPool := make(chan chan Job, maxWorkers)

quitChannel := make(chan bool)

return &WorkerPool{

JobQueue: jobQueue,

WorkerPool: workerPool,

maxWorkers: maxWorkers,

quitChannel: quitChannel,

}

}

func (wp *WorkerPool) Run() {

for i := 0; i < wp.maxWorkers; i++ {

worker := NewWorker(wp.WorkerPool)

worker.Start()

}

go wp.dispatch()

}

func (wp *WorkerPool) dispatch() {

for {

select {

case job := <-wp.JobQueue:

go func(job Job) {

jobChannel := <-wp.WorkerPool

jobChannel <- job

}(job)

case <-wp.quitChannel:

return

}

}

}

func (wp *WorkerPool) AddJob(job Job) {

wp.JobQueue <- job

}

func (wp *WorkerPool) Stop() {

close(wp.quitChannel)

}

func NewWorker(workerPool chan chan Job) *Worker {

jobChannel := make(chan Job)

quitChannel := make(chan bool)

return &Worker{

WorkerPool: workerPool,

JobChannel: jobChannel,

quitChannel: quitChannel,

}

}

func (w *Worker) Start() {

go func() {

for {

w.WorkerPool <- w.JobChannel

select {

case job := <-w.JobChannel:

fmt.Println(job.id)

case <-w.quitChannel:

return

}

}

}()

}

func (w *Worker) Stop() {

close(w.quitChannel)

}

func main() {

wp:=NewWorkerPool(10)

wp.Run()

for i:=0;i<10000;i++{

job:=Job{id:i}

wp.AddJob(job)

}

wp.Stop()

}

上面代码中我们定义了Job和Worker两个struct。Job代表了任务,而Worker则代表了具体执行任务的goroutine。WorkerPool则是一个调度器,用来控制Worker的创建和任务的分发。

4.1 NewWorkerPool

NewWorkerPool是WorkerPool的构造函数。其中jobQueue是用来存储任务的channel,workerPool是用来存储worker channel的channel,maxWorkers表示最大Worker数量,quitChannel为一个标志位,用来表示任务调度已经结束。

4.2 Run

Run方法是用来启动WorkerPool的。如下代码::

func (wp *WorkerPool) Run() {

for i := 0; i < wp.maxWorkers; i++ {

worker := NewWorker(wp.WorkerPool)

worker.Start()

}

go wp.dispatch()

}

上面代码中,我们创建了maxWorkers个的Worker,并调用Start方法对每个Worker进行启动,然后调度worker的分发。

4.3 AddJob

我们可以通过AddJob方法来为WorkerPool添加任务,如下代码::

func (wp *WorkerPool) AddJob(job Job) {

wp.JobQueue <- job

}

4.4 dispatch

dispatch方法是用来进行任务分发的,如下代码:

func (wp *WorkerPool) dispatch() {

for {

select {

case job := <-wp.JobQueue:

go func(job Job) {

jobChannel := <-wp.WorkerPool

jobChannel <- job

}(job)

case <-wp.quitChannel:

return

}

}

}

上面的代码中,我们通过select来监听JobQueue和quitChannel两个channel。当JobQueue中有任务时,我们会从WorkerPool中获取一个空闲的Worker,然后将任务分发给Worker。而当quitChannel被关闭时,任务调度结束。

4.5 NewWorker

NewWorker是用来构建Worker的。

4.6 Start

Start方法则是用来启动Worker的,实际上就是一个goroutine。

4.7 Stop

Stop方法用来停止Worker,并关闭quitChannel。

总结

通过上面的实现方式,我们已经成功的构建了一个简单的WorkerPool。WorkerPool是一个很重要的概念,可以让我们更好的控制goroutine并发执行数量,避免系统过载导致崩溃或者程序变慢。

后端开发标签