Go使用goroutine搭配channel併發處理大量訊息練習。
事前要求
參考「Golang 使用Channel在goroutine間傳遞資料」。
練習範例一
下面建立多個goroutine來處理發送到channel中的訊息,channel的角色類似queue(佇列),由單一來源接收訊息,然後將訊息發送給多個處理者。
注意這邊沒有考慮限制goroutine的數量,也就是pool的概念,又稱goroutine pool或worker pool,當訊息量大到一個級數時會產生過多的goroutine而超過系統記憶體的負擔。
main.go
package main
import (
"fmt"
)
func main() {
msgs := genMessages() // 產生多筆訊息
msgChen := make(chan string) // 建立訊息通道(channel)
workerNum := len(msgs) / 10 // 要建立的goroutine(worker)數量 = 訊息數量 / 10
for i := 0; i < workerNum; i++ {
go worker(msgChen) // 建立goroutine來處理channel的訊息
}
send(msgs, msgChen) // 將訊息發送到channel
}
func send(msgs []string, msgChen chan string) {
for j := 0; j < len(msgs); j++ {
msgChen <- msgs[j] // 將訊息逐筆發送到channel
}
close(msgChen) // 訊息都送到channel後將其關閉
}
func worker(msgChen chan string) {
for msg := range msgChen { // 將訊息從channel中取出。若channel中無訊息會阻塞
time.Sleep(time.Second * 1) // 模擬處理每筆訊息要耗費的時間
fmt.Printf("%s\n", msg)
}
}
func genMessages() []string {
msgs := make([]string, 0)
for i := 0; i < 100; i++ {
msgs = append(msgs, fmt.Sprintf("message %d", i))
}
return msgs
}
練習範例二
上面範例有個問題,也就是在worker還沒處理完訊息之前main程式就跑完了,導致訊息未全部印出,而這可以利用sync.WaitGroup
來等待所有的goroutine都執行結束,修改如下。
main.go
package main
import (
"fmt"
"sync"
"time"
)
func main() {
msgs := genMessages()
var wg sync.WaitGroup
workerNum := len(msgs) / 10
wg.Add(workerNum)
msgChen := make(chan string)
for i := 0; i < workerNum; i++ {
go worker(&wg, msgChen)
}
send(msgs, msgChen)
wg.Wait()
}
func send(msgs []string, msgChen chan string) {
for j := 0; j < len(msgs); j++ {
msgChen <- msgs[j]
}
close(msgChen)
}
func worker(wg *sync.WaitGroup, msgChen chan string) {
defer wg.Done()
for msg := range msgChen {
time.Sleep(time.Second * 1)
fmt.Printf("%s\n", msg)
}
}
func genMessages() []string {
msgs := make([]string, 0)
for i := 0; i < 100; i++ {
msgs = append(msgs, fmt.Sprintf("message %d", i))
}
return msgs
}
沒有留言:
張貼留言