1. 什么是Goroutines
Goroutines是Go语言中的轻量级线程,它们可以让开发者方便地实现并发编程。与其他编程语言的线程相比,Goroutines拥有更小的栈空间和更高的并发处理能力,可以轻松地创建非常多的Goroutines,而不必担心操作系统无法处理。
要创建一个Goroutines,只需要在函数或方法前面添加go关键字即可:
func main() {
go func() {
fmt.Println("Goroutine被调用")
}()
fmt.Println("Goroutine还未被调用")
}
在这个例子中,我们使用了一个匿名函数创建了一个新的Goroutine,并在其中打印了一条信息。由于该函数前面加了go关键字,所以该函数中的代码会在新的Goroutine中运行,而不是在主Goroutine中运行。因此,我们会看到“Goroutine还未被调用”这条信息先被打印,而“Goroutine被调用”稍后才被打印。
2. 并发IO操作的问题
在并发编程中,经常会涉及到IO操作,如读写文件、网络请求等。这些操作通常是耗时的,如果在单个Goroutine中执行,会导致程序阻塞,无法充分利用CPU的资源。
例如,在读取多个文件并将它们合并后输出时,一个简单的实现方式是遍历文件列表,依次读取每个文件的内容并将其写入输出文件。代码如下:
func mergeFiles(files []string, output string) error {
out, err := os.Create(output)
if err != nil {
return err
}
defer out.Close()
for _, file := range files {
f, err := os.Open(file)
if err != nil {
return err
}
defer f.Close()
_, err = io.Copy(out, f)
if err != nil {
return err
}
}
return nil
}
这个函数遍历每个文件,依次打开、读取、写入输出文件,并在每次读取/写入后关闭文件。如果文件数量很多,并且文件大小较大,这个过程可能需要较长的时间。并且,在文件读写期间,程序将无法做其他有用的事情。
2.1 传统的解决方案:使用线程池
为了解决这个问题,我们通常会使用线程池来提高处理并发IO操作的能力。
线程池是一个预先创建的线程队列,用于处理任务队列中的工作。在任务队列中添加任务时,线程池中的线程会竞争执行任务,避免任务同时存在,提高任务执行效率。
在Go语言中,我们可以使用goroutine池来类似地解决并发IO的问题。我们可以使用channel从任务队列中获取任务,然后再将任务分发给worker goroutine处理。下面是一个例子:
type task struct {
file string
output io.Writer
}
func mergeFiles(files []string, output string) error {
out, err := os.Create(output)
if err != nil {
return err
}
defer out.Close()
tasks := make(chan task)
var wg sync.WaitGroup
for i := 0; i < runtime.NumCPU(); i++ {
wg.Add(1)
go func() {
defer wg.Done()
for t := range tasks {
f, err := os.Open(t.file)
if err != nil {
log.Printf("Error opening %s: %s", t.file, err)
continue
}
defer f.Close()
_, err = io.Copy(t.output, f)
if err != nil {
log.Printf("Error copying %s: %s", t.file, err)
continue
}
}
}()
}
for _, file := range files {
tasks <- task{file, out}
}
close(tasks)
wg.Wait()
return nil
}
在这个例子中,我们创建了一个goroutine池,池中的goroutine会不断地从任务队列中获取任务并执行。
不过,这种方式仍然存在一些问题。首先,在Goroutines之间分发IO任务可能会导致调度的成本增加,因为任务并不是在每个Goroutine中都有相同的时间量得到执行。其次,竞争访问输出的次数会降低程序的性能,因为多个Goroutine可能会同时尝试往输出文件中写入数据。
2.2 如何通过Goroutines实现高效的并发IO操作
与使用线程池不同,在Go语言中,我们可以使用Goroutines和channel实现高效的并发IO操作。具体来说,我们可以将读取文件和写入文件的任务分离开来,并使用两个channel分别作为输入和输出。
在下面的例子中,我们创建了一个任务类型,其中包含了需要读取的文件名和需要写入的输出文件。
type task struct {
file string
output io.Writer
}
func readFile(taskChan chan task, resultChan chan []byte) {
for task := range taskChan {
f, err := os.Open(task.file)
if err != nil {
log.Printf("Error opening %s: %s", task.file, err)
continue
}
defer f.Close()
data, err := ioutil.ReadAll(f)
if err != nil {
log.Printf("Error reading %s: %s", task.file, err)
continue
}
resultChan <- data
}
}
func writeFile(resultChan chan []byte, numFiles int, out *os.File) {
var written int
for i := 0; i < numFiles; i++ {
data := <-resultChan
written, _ = out.Write(data)
if written != len(data) {
log.Printf("Error copying %s: short write", filename)
}
}
}
// 调用方式:
func mergeFiles(files []string, output string) error {
out, err := os.Create(output)
if err != nil {
return err
}
defer out.Close()
numFiles := len(files)
taskChan := make(chan task)
resultChan := make(chan []byte)
const numWorkers = 10
for i := 0; i < numWorkers; i++ {
go readFile(taskChan, resultChan)
}
go writeFile(resultChan, numFiles, out)
for _, file := range files {
taskChan <- task{file, out}
}
close(taskChan)
for range files {
<-resultChan
}
return nil
}
在这个例子中,我们使用了两个channel:taskChan和resultChan。taskChan中包含了需要读取的文件和需要写入文件,readFile goroutine 用来从taskChan中读取任务并执行。该goroutine会打开每个文件,读取文件的所有内容并将读取到的字节放入 resultChan 中。
writeFile goroutine 从 resultChan 中读取字节切片,并将其写入到输出文件中。在写入期间,只有一个 writeFile goroutine在运行,因此不需要进行加锁操作。
注意,在每个文件已被读取和写入完毕后,我们必须通过从resultChan中接收一个值来等待 writeFile goroutine 的完成。
通过这种方式,我们可以利用Goroutines和channel来避免竞争条件,并最大化利用CPU资源。
3. 总结
通过使用Goroutines和channel,我们可以轻松地实现高效的并发IO操作。相比于线程池等传统的方案,Golang提供的Goroutines和channel更为简单、高效和可控。
上面的例子展示了如何将一个大文件细分为许多小任务,并在多个Goroutines中执行这些任务。在处理大型数据集或文件时,这个方法可以显著提高程序的速度,并且消除了多线程的锁操作、调度成本和复杂性。