Go連線RabbitMQ時,若連線中斷時重新連線的簡單實作練習。
本篇參考Go AMQP 0.9.1 client library的API文件中的範例。
範例環境:
- Go 1.19
- github.com/rabbitmq/amqp091-go v1.5.0
事前要求
參考「Golang RabbitMQ hello world 接收訊息範例」,本篇依此進行修改。
範例
在專案根目錄新增一個mq/rabbitmq.go
內容如下。
建立一個RabbitMQ
struct及以下方法。
Connect
- 建立連線,若連線中斷會重新建立。把建立連線的邏輯放在for loop中來達到重建連線的效果,利用Connection.NotifyClose
進行連線時的阻塞及監聽連線關閉。連線成功設RabbitMQ.IsReady
為true,連線中斷時則設為false。Consume
- 連線成功後消費Rabbit MQ佇列中的訊息。把註冊消費者的邏輯放在for loop中令連線重建時可以使用新連線的Channel
重新註冊。只有在連線成功RabbitMQ.IsReady
為true時才會執行註冊消費者的邏輯。
mq/rabbitmq.go
package mq
import (
"fmt"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
type RabbitMQ struct {
mqurl string
conn *amqp.Connection
channel *amqp.Channel
IsReady bool
}
func NewRabbitMQ(mqurl string) RabbitMQ {
return RabbitMQ{
mqurl: mqurl,
}
}
func (r *RabbitMQ) Connect(retryDelay time.Duration) {
go func() {
for {
fmt.Println("attempting to connect")
conn, err := amqp.Dial(r.mqurl)
if err != nil {
fmt.Println("failed to connect, retrying...")
time.Sleep(retryDelay)
continue
}
r.conn = conn
for {
fmt.Println("attempting to open channel")
ch, err := r.conn.Channel()
if err != nil {
fmt.Println("failed to open channel, retrying...")
time.Sleep(retryDelay)
continue
}
r.channel = ch
r.IsReady = true
<-r.conn.NotifyClose(make(chan *amqp.Error))
r.IsReady = false
fmt.Println("connection closed, reconnecting...")
break
}
}
}()
}
func (r *RabbitMQ) Consume() (err error) {
for {
if r.IsReady {
msgs, err := r.channel.Consume(
"hello", // queue name
"hello-consumer", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
fmt.Println("failed to register a consumer, retrying...")
continue
}
for d := range msgs {
log.Printf("received a message: %s\n", d.Body)
}
}
}
}
在main.go
建立RabbitMQ
物件進行連線及消費訊息。
main.go
package main
import (
"time"
"abc.com/demo/mq"
)
func main() {
r := mq.NewRabbitMQ("amqp://guest:guest@localhost:5672/")
r.Connect(time.Second * 5)
r.Consume()
}
沒有留言:
張貼留言