網頁

2022/12/14

Golang push RabbitMQ message to WebSocket client decouple 練習

本篇修改於「Golang 接收RabbitMQ訊息並推送到WebSocket client」,試著把RabbitMQ接收訊息與把訊息由WebSocket發送至WebSocket client的邏輯進行解耦的練習。


在原範例中RabbitMQ的consumer是在WebSocket的監聽for迴圈中接收訊息,收到訊息後直接轉交WebSocket寫出至clien;此外RabbitMQ consumer的建立與WebSocket連線是綁定的,即WebSocket連線請求進來後RabbitMQ的consumer才會被建立並開始消費queue的訊息,但通知系統應該是即時的,系統啟動後不論是否有WebSocket連線都應持續消費queue中的訊息。另外在原範例中沒有指定訊息的發送對象,這邊會加上WebSocket的使用者和發送訊息給指定使用者的邏輯。


注意下面範例未充分測試,尤其是處理多執行緒的部分可能有誤。


練習

mq/consumer.go為消費RabbitMQ queue訊息的邏輯。介面Consumer定義消費的方法Consume並傳入訊息接收進來後的處理函式MessageHandler參數。因此Consumer對訊息將如何被處理一無所知。

mq/consumer.go

package mq

import (
    "encoding/json"
    "log"

    amqp "github.com/rabbitmq/amqp091-go"
)

type Consuemr interface {
    Consume(handler MessageHandler) error
}

type MessageHandler func(userId, content string) error

type Message struct {
    UserId  string `json:"userId"`
    Content string `json:"content"`
}

type RabbitMQConsumer struct {
    url string
}

func NewRabbitMQConsumer(url string) RabbitMQConsumer {
    return RabbitMQConsumer{
        url: url,
    }
}

func (rc *RabbitMQConsumer) Consume(queueName, consumerName string, handler MessageHandler) error {
    mqConn, err := amqp.Dial(rc.url) // create rabbitmq connection
    if err != nil {
        return err
    }
    defer mqConn.Close()

    ch, err := mqConn.Channel() // create message channel
    if err != nil {
        return err
    }
    defer ch.Close()

    // consume message from queue 'hello'
    msgs, err := ch.Consume(
        queueName,    // queue name
        consumerName, // consumer name
        true,         // auto-ack
        false,        // exclusive
        false,        // no-local
        false,        // no-wait
        nil,          // args
    )
    if err != nil {
        return err
    }
    var forever chan struct{}
    go func() {
        for d := range msgs {
            var message Message
            err := json.Unmarshal(d.Body, &message)
            if err != nil {
                log.Printf("unmarshal queue message body=[%v] to message error, err=%v", d.Body, err)
            }
            err = handler(message.UserId, message.Content)
            if err != nil {
                log.Printf("handle userId=[%s]'s queue message error, err=%v", message.UserId, err)
            }
        }
    }()
    <-forever
    return nil
}

ws/notifier.go為WebSocket管理連線及寫出訊息的邏輯。介面Notifier定義註冊及移除註冊WebSocket連線的方法RegisterUnregister,發送訊息的方法Notify

Register註冊連線是把連線請求的使用者userId及該請求的WebSocket連線物件放入map connMap中管理。當Notify發送訊息時會以指定的userId為key去connMap找是否有符合的WebSocket連線物件,若找不到代表目前該訊息的推送對象未開啟連線。

Unregister移除註冊連線則是把指定userId的連線從connMap移除並關閉連線。

存取connMap時都加上了mutex鎖,因為要避免同時多個請求存取造成的執行緒干涉。

ws/notifier.go

package ws

import (
    "errors"
    "fmt"
    "log"
    "net/http"
    "sync"

    "github.com/gorilla/websocket"
)

type Notifier interface {
    Register(userId string, conn *websocket.Conn)
    Unregister(userId string) error
    Notify(userId, message string) error
}

type WebSocketManager struct {
    connMap map[string]*websocket.Conn
    rwmu    sync.RWMutex
}

func NewWebSocketManager() WebSocketManager {
    return WebSocketManager{
        connMap: make(map[string]*websocket.Conn),
    }
}

func (wm *WebSocketManager) Register(userId string, conn *websocket.Conn) {
    wm.rwmu.Lock()
    defer wm.rwmu.Unlock()
    wm.connMap[userId] = conn
    log.Printf("userId=[%s] websocket connection registered", userId)
}

func (wm *WebSocketManager) Unregister(userId string) error {
    wm.rwmu.Lock()
    defer wm.rwmu.Unlock()
    conn, ok := wm.connMap[userId]
    if !ok {
        return errors.New("unregister connection failed")
    }
    
    delete(wm.connMap, userId)
    log.Printf("userId=[%s] websocket connection unregistered then closed", userId)
    return conn.Close()
}

func (wm *WebSocketManager) GetConn(userId string) *websocket.Conn {
    wm.rwmu.RLock()
    defer wm.rwmu.RUnlock()

    conn, ok := wm.connMap[userId]
    if !ok {
        return nil
    }
    return conn
}

func (wm *WebSocketManager) Notify(userId, message string) error {
    conn := wm.GetConn(userId)
    if conn == nil {
        return fmt.Errorf("userId=[%s]'s websocket connection not found", userId)
    }

    err := conn.WriteMessage(websocket.TextMessage, []byte(message))
    if err != nil {
        return err
    }
    return nil
}

func OpenConn(w http.ResponseWriter, r *http.Request) (*websocket.Conn, error) {
    upgrader := websocket.Upgrader{
        CheckOrigin: func(r *http.Request) bool {
            return true // diable CORS check
        },
    }

    wsConn, err := upgrader.Upgrade(w, r, nil) // get a websocket connection
    if err != nil {
        return nil, err
    }
    return wsConn, nil
}

handler/handler.goNotificationHandler定義WebSocket連線routing及處理RabbitMQ消費訊息轉交的MessageHandler

NotificationHandler.Handle收到client送來的WebSocket連線請求後取得該請求的userId並與WebSocket連線物件註冊到Notifer,為Notifer.Notify通知時的對象之一。

最下方的websocket.Conn.ReadMessage會保持WebSocket連線的監聽狀態,在收到訊息或發生錯誤前會阻塞在此。若client關閉連線則會返回錯讓程式繼續直到結束呼叫defer的Notifier.Unregister將此請求userId的WebSocket連線取消註冊並關閉。

handler/handler.go

package handler

import (
    "log"
    "net/http"

    "abc.com/demo/mq"
    "abc.com/demo/ws"
)

type NotificationHandler struct {
    notifier ws.Notifier
}

func NewNotificationHandler(notifier ws.Notifier) NotificationHandler {
    return NotificationHandler{
        notifier: notifier,
    }
}

func (h *NotificationHandler) Handle() http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        userId := r.URL.Query().Get("userId")
        if userId == "" {
            log.Print("refuse websocket connection, userId is required")
            return
        }

        wsConn, err := ws.OpenConn(w, r)
        if err != nil {
            log.Printf("open websocket connection error, err=%v", err)
        }

        h.notifier.Register(userId, wsConn)
        defer h.notifier.Unregister(userId)

        _, _, err = wsConn.ReadMessage() // block until get message from client
        if err != nil {
            log.Printf("read message from websocket client error, err=%v", err)
        }

    }
}

func (h *NotificationHandler) MessageHandler() mq.MessageHandler {
    return func(userId, message string) error {
        return h.notifier.Notify(userId, message)
    }
}


main.go會開另條goroutine執行Consumer.Consume去消費RabbitMQ的訊息,訊息如何被處理則交由傳入的NotificationHandler.MessageHandler處理。

NotificationHandler需注入Notifier物件為成員屬性,用於NotificationHandler.Handle開啟WebSocket連線的註冊對象,及NotificationHandler.MessageHandler處理RabbitMQ訊息時需要委由Notifier.Notify發送通知訊息給有註冊WebSocket的userId

main.go

package main

import (
    "net/http"

    "abc.com/demo/handler"
    "abc.com/demo/mq"
    "abc.com/demo/ws"
)

func main() {
    notifier := ws.NewWebSocketManager()
    notificationHandler := handler.NewNotificationHandler(&notifier)
    consumer := mq.NewRabbitMQConsumer("amqp://guest:guest@localhost:5672/")
    go consumer.Consume("hello", "hello-consumer", notificationHandler.MessageHandler())

    http.HandleFunc("/notification", notificationHandler.Handle())
    http.ListenAndServe(":8080", nil)
}

github


測試

啟動Go程式,開啟Simple WebSocket Client的[URL]欄位輸入ws://localhost:8080/notification?userId=123發送WebSocket連線請求及query string userId=123

在RabbitMQ的hello queue的[Publish message]的[Payload]輸入下面訊息並送出,訊息會由default exchange發送至hello queue中。

{"userId":"123","content":"hello world"}

接著hello queue的訊息會被Go程式的Consumer "hello-consumer"消費並交由WebSocket連線推送給client,最後在Simple WebSocket Client的[Message Log]可看到Go程式推送來的訊息"hello world"。

最後點選Simple WebSocket Client的Close關閉連線。

從開始到結束console會印出以下訊息:

2022/12/15 01:17:00 userId=[123] websocket connection registered
2022/12/15 01:17:19 read message from websocket client error, err=websocket: close 1005 (no status)
2022/12/15 01:17:19 userId=[123] websocket connection unregistered then closed

沒有留言:

張貼留言