本篇介紹如何從RabbitMQ接收訊息然後推送到WebSocket client。
範例環境:
- Go 1.19
- RabbitMQ 3.11.4
事前要求
參考「Golang RabbitMQ hello world 接收訊息範例」。
參考「Golang WebSocket hello world」。
範例
下面結合了「Golang RabbitMQ hello world 接收訊息範例」及「Golang WebSocket hello world」的程式碼,從RabbitMQ的'hello' queue接收訊息後將訊息寫出至WebSocket client。
main.go
package main
import (
"fmt"
"net/http"
"github.com/gorilla/websocket"
amqp "github.com/rabbitmq/amqp091-go"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true // diable CORS check
},
}
func main() {
http.HandleFunc("/notification", notificationHandler)
http.ListenAndServe(":8080", nil)
}
func notificationHandler(w http.ResponseWriter, r *http.Request) {
mqConn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") // create rabbitmq connection
if err != nil {
panic(err)
}
defer mqConn.Close()
ch, err := mqConn.Channel() // create message channel
if err != nil {
panic(err)
}
defer ch.Close()
// consume message from queue 'hello'
msgs, err := ch.Consume(
"hello", // queue name
"hello-consumer", // consumer name
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
panic(err)
}
wsConn, err := upgrader.Upgrade(w, r, nil) // get a websocket connection
if err != nil {
panic(err)
}
defer wsConn.Close()
messageHandler := func(bytes []byte) error {
fmt.Printf("push message=\"%s\"\n", bytes)
err = wsConn.WriteMessage(websocket.TextMessage, bytes) // write a message to client
if err != nil {
return err
}
return nil
}
for {
var forever chan struct{}
receive(msgs, messageHandler) // pass consumed message to messageHandler
<-forever
}
}
type MessageHandler func(bytes []byte) error
func receive(msgs <-chan amqp.Delivery, handler MessageHandler) {
go func() {
for d := range msgs {
handler(d.Body)
}
}()
}
測試
啟動Go專案。
Chrome瀏覽器安裝Simple WebSocket Client並打開。在[Server Location]的[URL]欄位輸入ws://localhost:8080/notification
開啟WebSocket連線。
在RabbitMQ UI建立名稱為"hello"的queue(自動binding default exchange)。
在hello queue中的[Publish message]的[Payload]輸入訊息並送出,則訊息會由default exchange發送至hello queue中。
接著hello queue的訊息會被Go程式的Consumer "hello-consumer"消費並交由WebSocket連線推送給client,最後在Simple WebSocket Client的[Message Log]可看到Go程式推送來的訊息。
沒有留言:
張貼留言