使用 Go 协程(goroutines)可以让我们轻松实现并发编程,从而大大提升程序的运行效率。当我们从多个Go 协程中获取它们的执行结果时,可能会遇到丢失数据的问题。
问题背景
假如需要处理一个包含很多元素的数组,我们希望每个元素的处理都能同时进行,也就是并行处理。一般我们会为每个元素启动一个Go协程,并且在它们都完成后,收集它们的处理结果。
通常情况下,我们会使用sync.WaitGroup
来确保所有的协程都已经执行完毕,然后从一个通道(channel)中取出每个协程的结果。但有时候你会发现,并没有收到所有的结果,这是为什么?
问题分析
原因在于通道的容量以及协程的执行速度。当你尝试把一个结果放进通道的时候,如果通道已经满了,那么这个协程就会被“卡住”,直到通道中有空位为止。但是,如果有其他的协程在这个协程“卡住”的时候继续往通道里放结果,那么这些新来的结果就会丢失,这就是我们收不到所有结果的原因。
解决方案
可以采用以下方式:
- 设置通道的容量:如果知道会有多少个协程产生结果时,我们可以把通道的容量设置得足够大,以便能够装下所有的结果。比如,如果我们有10个协程,就可以把通道的容量设置为10。
- 在所有协程完成后关闭通道:当所有的协程都已经完成了它们的任务,并且把结果放入了通道后,我们就应该把通道关闭。这样做可以告诉那些正在等待取结果的地方,不会再有新的结果过来了。即使通道还没有完全被取空,
range
循环也会正常结束,确保所有的结果都被处理了。
代码示例
package main
import (
"fmt"
"sync"
"time"
)
// 定义一个函数,用于检查数字是否为奇数
func odd(i int) (int, error) {
time.Sleep(1 * time.Second)
if i%2 == 0 {
return i, fmt.Errorf("even number")
} else {
return i, nil
}
}
func main() {
// 定义一个结构体,用于存储协程的结果
type Result struct {
value int
err error
}
// 创建一个WaitGroup,用于等待所有协程完成
var wg sync.WaitGroup
// 创建一个通道,用于存放协程的结果,容量设置为10
responseChannel := make(chan Result, 10)
// 启动10个协程
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
result, err := odd(i)
response := Result{value: result, err: err}
responseChannel <- response
fmt.Printf("Queued Response: %d , size: %d \n", response.value, len(responseChannel))
}(i)
}
// 等待所有协程完成
wg.Wait()
// 关闭通道,表明所有协程都已经完成并将结果放入了通道
close(responseChannel)
fmt.Println("Done Waiting")
fmt.Println("Response Channel Length: ", len(responseChannel))
// 循环读取通道中的结果,直到通道被关闭
for response := range responseChannel {
if response.err != nil {
fmt.Printf("%d , %s\n", response.value, response.err.Error())
} else {
fmt.Printf("%d\n", response.value)
}
}
fmt.Println("Finished")
}