網頁

2022/11/17

Golang RabbitMQ hello world 接收訊息範例

Go程式從RabbitMQ接收訊息hello world範例。


範例環境:

  • Go 1.19
  • github.com/rabbitmq/amqp091-go v1.5.0

事前要求

參考Golang RabbitMQ hello world 發送訊息範例


安裝套件

在專案根目錄以命令列輸入go get github.com/rabbitmq/amqp091-go下載RabbitMQ官方維護的Go RabbitMQ Client Library,Go利用此套件與RabbitMQ溝通。


範例程式

匯入amqp套件amqp "github.com/rabbitmq/amqp091-go"

呼叫amqp.Dial(url string)取得連線物件。url參數為RabbitMQ的AMQP帳密及位址。例如在本機的RabbitMQ位址預設為amqp://guest:guest@localhost:5672/

呼叫Connection.Channel()建立與RabbitMQ的訊息通道。

呼叫Channel.Consume()建立Consumer接收訊息,參數如下:

  • queue string - queue名稱。
  • consumer string - consumer名稱。
  • autoAck bool - 是否自動確認訊息已送,true則queue發出訊息後便自動刪除訊息不論consumer是否消費。
  • exclusive bool - 此consumer是否為queue的唯一消費者。
  • noLocal bool - RabbitMQ不支援。
  • noWait bool - 是否不等待server確認訊息可送立刻消費,若無法消費則拋出例外。
  • args Table - 其他參數。

Channel.Consume()返回一個Deliverychan物件為要消費的訊息,consumer會持續把queue的訊息傳送到此channel。用一個無窮迴圈去持續接收consumer傳來的訊息若,channel無訊息則阻塞。

main.go

package main

import (
    "log"

    amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
    // 取得連線
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    // 建立通道
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    // 註冊consumer
    msgs, err := ch.Consume(
        "hello",          // queue name
        "hello-consumer", // consumer name
        true,             // auto-ack
        false,            // exclusive
        false,            // no-local
        false,            // no-wait
        nil,              // args
    )
    failOnError(err, "Failed to register a consumer")

    var forever chan struct{} // 阻塞main goroutine

    // 開啟一個goroutine去非同步接收訊息
    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
        }
    }()

    log.Printf("Waiting for messages. To exit press CTRL+C\n")
    <-forever // 阻塞main goroutine
}

func failOnError(err error, msg string) {
    if err != nil {
        log.Panicf("%s: %s", msg, err)
    }
}

github



測試

執行程式返回如下。

2022/11/18 13:53:08 Waiting for messages. To exit press CTRL+C
2022/11/18 13:53:08 Received a message: hello world

前往RabbitMQ UI管理介面的[Queue]頁面可看到queue "hello"中無待收訊息,且在[Consumers]項下有一consumer為"hello-consumer"。




MessageHandler

把上面範例的goroutine接收訊息抽出為Receive函式,並把訊息的處理委派給外部傳入的MessageHandler函式參數處理(即callback)。

main.go

package main

import (
    "log"

    amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
    // 取得連線
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    // 建立通道
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    // 註冊consumer
    msgs, err := ch.Consume(
        "hello",          // queue name
        "hello-consumer", // consumer name
        true,             // auto-ack
        false,            // exclusive
        false,            // no-local
        false,            // no-wait
        nil,              // args
    )
    failOnError(err, "Failed to register a consumer")

    var forever chan struct{} // 阻塞main goroutine

    Receive(msgs, func(b []byte) {
        log.Printf("Received a message: %s", string(b))
    })

    log.Printf("Waiting for messages. To exit press CTRL+C\n")
    <-forever // 阻塞main goroutine
}

type MessageHandler func(b []byte)

// 開啟一個goroutine去非同步接收訊息,並委派MessageHandler處理收來的訊息
func Receive(msgs <-chan amqp.Delivery, handler MessageHandler) {
    go func() {
        for d := range msgs {
            handler(d.Body)
        }
    }()
}

func failOnError(err error, msg string) {
    if err != nil {
        log.Panicf("%s: %s", msg, err)
    }
}


沒有留言:

張貼留言