網頁

2024/3/20

Golang 併發處理訊息練習

Go使用goroutine搭配channel併發處理大量訊息練習。


事前要求

參考「Golang 使用Channel在goroutine間傳遞資料」。


練習範例一

下面建立多個goroutine來處理發送到channel中的訊息,channel的角色類似queue(佇列),由單一來源接收訊息,然後將訊息發送給多個處理者。

注意這邊沒有考慮限制goroutine的數量,也就是pool的概念,又稱goroutine pool或worker pool,當訊息量大到一個級數時會產生過多的goroutine而超過系統記憶體的負擔。

main.go

package main

import (
    "fmt"
)

func main() {
    msgs := genMessages()        // 產生多筆訊息
    msgChen := make(chan string) // 建立訊息通道(channel)
    workerNum := len(msgs) / 10  // 要建立的goroutine(worker)數量 = 訊息數量 / 10
    for i := 0; i < workerNum; i++ {
        go worker(msgChen) // 建立goroutine來處理channel的訊息
    }
    send(msgs, msgChen) // 將訊息發送到channel
}

func send(msgs []string, msgChen chan string) {
    for j := 0; j < len(msgs); j++ {
        msgChen <- msgs[j] // 將訊息逐筆發送到channel
    }
    close(msgChen) // 訊息都送到channel後將其關閉
}

func worker(msgChen chan string) {
    for msg := range msgChen { // 將訊息從channel中取出。若channel中無訊息會阻塞
        time.Sleep(time.Second * 1) // 模擬處理每筆訊息要耗費的時間
        fmt.Printf("%s\n", msg)
    }
}

func genMessages() []string {
    msgs := make([]string, 0)
    for i := 0; i < 100; i++ {
        msgs = append(msgs, fmt.Sprintf("message %d", i))
    }
    return msgs
}

github



練習範例二

上面範例有個問題,也就是在worker還沒處理完訊息之前main程式就跑完了,導致訊息未全部印出,而這可以利用sync.WaitGroup來等待所有的goroutine都執行結束,修改如下。

main.go

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    msgs := genMessages()

    var wg sync.WaitGroup
    workerNum := len(msgs) / 10
    wg.Add(workerNum)
    
    msgChen := make(chan string)
    for i := 0; i < workerNum; i++ {
        go worker(&wg, msgChen)
    }
    
    send(msgs, msgChen)
    wg.Wait()
}

func send(msgs []string, msgChen chan string) {
    for j := 0; j < len(msgs); j++ {
        msgChen <- msgs[j]
    }
    close(msgChen)
}

func worker(wg *sync.WaitGroup, msgChen chan string) {
    defer wg.Done()
    for msg := range msgChen {
        time.Sleep(time.Second * 1)
        fmt.Printf("%s\n", msg)
    }
}

func genMessages() []string {
    msgs := make([]string, 0)
    for i := 0; i < 100; i++ {
        msgs = append(msgs, fmt.Sprintf("message %d", i))
    }
    return msgs
}

github




沒有留言:

張貼留言