網頁

2022/12/7

Golang 接收RabbitMQ訊息並推送到WebSocket client

本篇介紹如何從RabbitMQ接收訊息然後推送到WebSocket client。


範例環境:

  • Go 1.19
  • RabbitMQ 3.11.4


事前要求

參考「Mac Homebrew 安裝RabbitMQ」。

參考「Golang RabbitMQ hello world 接收訊息範例」。

參考「Golang WebSocket hello world」。


範例

下面結合了「Golang RabbitMQ hello world 接收訊息範例」及「Golang WebSocket hello world」的程式碼,從RabbitMQ的'hello' queue接收訊息後將訊息寫出至WebSocket client。

main.go

package main

import (
    "fmt"
    "net/http"

    "github.com/gorilla/websocket"
    amqp "github.com/rabbitmq/amqp091-go"
)

var upgrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool {
        return true // diable CORS check
    },
}

func main() {

    http.HandleFunc("/notification", notificationHandler)
    http.ListenAndServe(":8080", nil)
}

func notificationHandler(w http.ResponseWriter, r *http.Request) {
    mqConn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") // create rabbitmq connection
    if err != nil {
        panic(err)
    }
    defer mqConn.Close()

    ch, err := mqConn.Channel() // create message channel
    if err != nil {
        panic(err)
    }
    defer ch.Close()

    // consume message from queue 'hello'
    msgs, err := ch.Consume(
        "hello",          // queue name
        "hello-consumer", // consumer name
        true,             // auto-ack
        false,            // exclusive
        false,            // no-local
        false,            // no-wait
        nil,              // args
    )
    if err != nil {
        panic(err)
    }

    wsConn, err := upgrader.Upgrade(w, r, nil) // get a websocket connection
    if err != nil {
        panic(err)
    }
    defer wsConn.Close()

    messageHandler := func(bytes []byte) error {
        fmt.Printf("push message=\"%s\"\n", bytes)
        err = wsConn.WriteMessage(websocket.TextMessage, bytes) // write a message to client
        if err != nil {
            return err
        }
        return nil
    }
    for {
        var forever chan struct{}
        receive(msgs, messageHandler) // pass consumed message to messageHandler
        <-forever
    }
}

type MessageHandler func(bytes []byte) error

func receive(msgs <-chan amqp.Delivery, handler MessageHandler) {
    go func() {
        for d := range msgs {
            handler(d.Body)
        }
    }()
}

github


測試

啟動Go專案。

Chrome瀏覽器安裝Simple WebSocket Client並打開。在[Server Location]的[URL]欄位輸入ws://localhost:8080/notification開啟WebSocket連線。

在RabbitMQ UI建立名稱為"hello"的queue(自動binding default exchange)。



在hello queue中的[Publish message]的[Payload]輸入訊息並送出,則訊息會由default exchange發送至hello queue中。



接著hello queue的訊息會被Go程式的Consumer "hello-consumer"消費並交由WebSocket連線推送給client,最後在Simple WebSocket Client的[Message Log]可看到Go程式推送來的訊息。





沒有留言:

張貼留言