Go程式從RabbitMQ接收訊息hello world範例。
範例環境:
- Go 1.19
- github.com/rabbitmq/amqp091-go v1.5.0
事前要求
參考Golang RabbitMQ hello world 發送訊息範例。
安裝套件
在專案根目錄以命令列輸入go get github.com/rabbitmq/amqp091-go
下載RabbitMQ官方維護的Go RabbitMQ Client Library,Go利用此套件與RabbitMQ溝通。
範例程式
匯入amqp套件amqp "github.com/rabbitmq/amqp091-go"
。
呼叫amqp.Dial(url string)
取得連線物件。url
參數為RabbitMQ的AMQP帳密及位址。例如在本機的RabbitMQ位址預設為amqp://guest:guest@localhost:5672/
。
呼叫Connection.Channel()
建立與RabbitMQ的訊息通道。
呼叫Channel.Consume()
建立Consumer接收訊息,參數如下:
queue string
- queue名稱。consumer string
- consumer名稱。autoAck bool
- 是否自動確認訊息已送,true
則queue發出訊息後便自動刪除訊息不論consumer是否消費。exclusive bool
- 此consumer是否為queue的唯一消費者。noLocal bool
- RabbitMQ不支援。noWait bool
- 是否不等待server確認訊息可送立刻消費,若無法消費則拋出例外。args Table
- 其他參數。
Channel.Consume()
返回一個Delivery
的chan
物件為要消費的訊息,consumer會持續把queue的訊息傳送到此channel。用一個無窮迴圈去持續接收consumer傳來的訊息若,channel無訊息則阻塞。
main.go
package main
import (
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
// 取得連線
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 建立通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 註冊consumer
msgs, err := ch.Consume(
"hello", // queue name
"hello-consumer", // consumer name
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
var forever chan struct{} // 阻塞main goroutine
// 開啟一個goroutine去非同步接收訊息
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf("Waiting for messages. To exit press CTRL+C\n")
<-forever // 阻塞main goroutine
}
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
測試
執行程式返回如下。
2022/11/18 13:53:08 Waiting for messages. To exit press CTRL+C
2022/11/18 13:53:08 Received a message: hello world
前往RabbitMQ UI管理介面的[Queue]頁面可看到queue "hello"中無待收訊息,且在[Consumers]項下有一consumer為"hello-consumer"。
MessageHandler
把上面範例的goroutine接收訊息抽出為Receive
函式,並把訊息的處理委派給外部傳入的MessageHandler
函式參數處理(即callback)。
main.go
package main
import (
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
// 取得連線
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 建立通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 註冊consumer
msgs, err := ch.Consume(
"hello", // queue name
"hello-consumer", // consumer name
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
var forever chan struct{} // 阻塞main goroutine
Receive(msgs, func(b []byte) {
log.Printf("Received a message: %s", string(b))
})
log.Printf("Waiting for messages. To exit press CTRL+C\n")
<-forever // 阻塞main goroutine
}
type MessageHandler func(b []byte)
// 開啟一個goroutine去非同步接收訊息,並委派MessageHandler處理收來的訊息
func Receive(msgs <-chan amqp.Delivery, handler MessageHandler) {
go func() {
for d := range msgs {
handler(d.Body)
}
}()
}
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
沒有留言:
張貼留言