本篇修改於「Golang 接收RabbitMQ訊息並推送到WebSocket client」,試著把RabbitMQ接收訊息與把訊息由WebSocket發送至WebSocket client的邏輯進行解耦的練習。
在原範例中RabbitMQ的consumer是在WebSocket的監聽for迴圈中接收訊息,收到訊息後直接轉交WebSocket寫出至clien;此外RabbitMQ consumer的建立與WebSocket連線是綁定的,即WebSocket連線請求進來後RabbitMQ的consumer才會被建立並開始消費queue的訊息,但通知系統應該是即時的,系統啟動後不論是否有WebSocket連線都應持續消費queue中的訊息。另外在原範例中沒有指定訊息的發送對象,這邊會加上WebSocket的使用者和發送訊息給指定使用者的邏輯。
注意下面範例未充分測試,尤其是處理多執行緒的部分可能有誤。
練習
mq/consumer.go
為消費RabbitMQ queue訊息的邏輯。介面Consumer
定義消費的方法Consume
並傳入訊息接收進來後的處理函式MessageHandler
參數。因此Consumer對訊息將如何被處理一無所知。
mq/consumer.go
package mq
import (
"encoding/json"
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
type Consuemr interface {
Consume(handler MessageHandler) error
}
type MessageHandler func(userId, content string) error
type Message struct {
UserId string `json:"userId"`
Content string `json:"content"`
}
type RabbitMQConsumer struct {
url string
}
func NewRabbitMQConsumer(url string) RabbitMQConsumer {
return RabbitMQConsumer{
url: url,
}
}
func (rc *RabbitMQConsumer) Consume(queueName, consumerName string, handler MessageHandler) error {
mqConn, err := amqp.Dial(rc.url) // create rabbitmq connection
if err != nil {
return err
}
defer mqConn.Close()
ch, err := mqConn.Channel() // create message channel
if err != nil {
return err
}
defer ch.Close()
// consume message from queue 'hello'
msgs, err := ch.Consume(
queueName, // queue name
consumerName, // consumer name
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return err
}
var forever chan struct{}
go func() {
for d := range msgs {
var message Message
err := json.Unmarshal(d.Body, &message)
if err != nil {
log.Printf("unmarshal queue message body=[%v] to message error, err=%v", d.Body, err)
}
err = handler(message.UserId, message.Content)
if err != nil {
log.Printf("handle userId=[%s]'s queue message error, err=%v", message.UserId, err)
}
}
}()
<-forever
return nil
}
ws/notifier.go
為WebSocket管理連線及寫出訊息的邏輯。介面Notifier
定義註冊及移除註冊WebSocket連線的方法Register
及Unregister
,發送訊息的方法Notify
。
Register
註冊連線是把連線請求的使用者userId
及該請求的WebSocket連線物件放入map connMap
中管理。當Notify
發送訊息時會以指定的userId
為key去connMap
找是否有符合的WebSocket連線物件,若找不到代表目前該訊息的推送對象未開啟連線。
Unregister
移除註冊連線則是把指定userId
的連線從connMap
移除並關閉連線。
存取connMap
時都加上了mutex鎖,因為要避免同時多個請求存取造成的執行緒干涉。
ws/notifier.go
package ws
import (
"errors"
"fmt"
"log"
"net/http"
"sync"
"github.com/gorilla/websocket"
)
type Notifier interface {
Register(userId string, conn *websocket.Conn)
Unregister(userId string) error
Notify(userId, message string) error
}
type WebSocketManager struct {
connMap map[string]*websocket.Conn
rwmu sync.RWMutex
}
func NewWebSocketManager() WebSocketManager {
return WebSocketManager{
connMap: make(map[string]*websocket.Conn),
}
}
func (wm *WebSocketManager) Register(userId string, conn *websocket.Conn) {
wm.rwmu.Lock()
defer wm.rwmu.Unlock()
wm.connMap[userId] = conn
log.Printf("userId=[%s] websocket connection registered", userId)
}
func (wm *WebSocketManager) Unregister(userId string) error {
wm.rwmu.Lock()
defer wm.rwmu.Unlock()
conn, ok := wm.connMap[userId]
if !ok {
return errors.New("unregister connection failed")
}
delete(wm.connMap, userId)
log.Printf("userId=[%s] websocket connection unregistered then closed", userId)
return conn.Close()
}
func (wm *WebSocketManager) GetConn(userId string) *websocket.Conn {
wm.rwmu.RLock()
defer wm.rwmu.RUnlock()
conn, ok := wm.connMap[userId]
if !ok {
return nil
}
return conn
}
func (wm *WebSocketManager) Notify(userId, message string) error {
conn := wm.GetConn(userId)
if conn == nil {
return fmt.Errorf("userId=[%s]'s websocket connection not found", userId)
}
err := conn.WriteMessage(websocket.TextMessage, []byte(message))
if err != nil {
return err
}
return nil
}
func OpenConn(w http.ResponseWriter, r *http.Request) (*websocket.Conn, error) {
upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true // diable CORS check
},
}
wsConn, err := upgrader.Upgrade(w, r, nil) // get a websocket connection
if err != nil {
return nil, err
}
return wsConn, nil
}
handler/handler.go
的NotificationHandler
定義WebSocket連線routing及處理RabbitMQ消費訊息轉交的MessageHandler
。
NotificationHandler.Handle
收到client送來的WebSocket連線請求後取得該請求的userId
並與WebSocket連線物件註冊到Notifer
,為Notifer.Notify
通知時的對象之一。
最下方的websocket.Conn.ReadMessage
會保持WebSocket連線的監聽狀態,在收到訊息或發生錯誤前會阻塞在此。若client關閉連線則會返回錯讓程式繼續直到結束呼叫defer的Notifier.Unregister
將此請求userId
的WebSocket連線取消註冊並關閉。
handler/handler.go
package handler
import (
"log"
"net/http"
"abc.com/demo/mq"
"abc.com/demo/ws"
)
type NotificationHandler struct {
notifier ws.Notifier
}
func NewNotificationHandler(notifier ws.Notifier) NotificationHandler {
return NotificationHandler{
notifier: notifier,
}
}
func (h *NotificationHandler) Handle() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
userId := r.URL.Query().Get("userId")
if userId == "" {
log.Print("refuse websocket connection, userId is required")
return
}
wsConn, err := ws.OpenConn(w, r)
if err != nil {
log.Printf("open websocket connection error, err=%v", err)
}
h.notifier.Register(userId, wsConn)
defer h.notifier.Unregister(userId)
_, _, err = wsConn.ReadMessage() // block until get message from client
if err != nil {
log.Printf("read message from websocket client error, err=%v", err)
}
}
}
func (h *NotificationHandler) MessageHandler() mq.MessageHandler {
return func(userId, message string) error {
return h.notifier.Notify(userId, message)
}
}
在main.go
會開另條goroutine執行Consumer.Consume
去消費RabbitMQ的訊息,訊息如何被處理則交由傳入的NotificationHandler.MessageHandler
處理。
NotificationHandler
需注入Notifier
物件為成員屬性,用於NotificationHandler.Handle
開啟WebSocket連線的註冊對象,及NotificationHandler.MessageHandler
處理RabbitMQ訊息時需要委由Notifier.Notify
發送通知訊息給有註冊WebSocket的userId
。
main.go
package main
import (
"net/http"
"abc.com/demo/handler"
"abc.com/demo/mq"
"abc.com/demo/ws"
)
func main() {
notifier := ws.NewWebSocketManager()
notificationHandler := handler.NewNotificationHandler(¬ifier)
consumer := mq.NewRabbitMQConsumer("amqp://guest:guest@localhost:5672/")
go consumer.Consume("hello", "hello-consumer", notificationHandler.MessageHandler())
http.HandleFunc("/notification", notificationHandler.Handle())
http.ListenAndServe(":8080", nil)
}
測試
啟動Go程式,開啟Simple WebSocket Client的[URL]欄位輸入ws://localhost:8080/notification?userId=123
發送WebSocket連線請求及query string userId=123
。
在RabbitMQ的hello queue的[Publish message]的[Payload]輸入下面訊息並送出,訊息會由default exchange發送至hello queue中。
{"userId":"123","content":"hello world"}
接著hello queue的訊息會被Go程式的Consumer "hello-consumer"消費並交由WebSocket連線推送給client,最後在Simple WebSocket Client的[Message Log]可看到Go程式推送來的訊息"hello world"。
最後點選Simple WebSocket Client的Close關閉連線。
從開始到結束console會印出以下訊息:
2022/12/15 01:17:00 userId=[123] websocket connection registered
2022/12/15 01:17:19 read message from websocket client error, err=websocket: close 1005 (no status)
2022/12/15 01:17:19 userId=[123] websocket connection unregistered then closed
沒有留言:
張貼留言