網頁

2023/1/6

Golang RabbitMQ 重建連線 reconnection

Go連線RabbitMQ時,若連線中斷時重新連線的簡單實作練習。


本篇參考Go AMQP 0.9.1 client library的API文件中的範例

範例環境:

  • Go 1.19
  • github.com/rabbitmq/amqp091-go v1.5.0


事前要求

參考「Golang RabbitMQ hello world 接收訊息範例」,本篇依此進行修改。


範例

在專案根目錄新增一個mq/rabbitmq.go內容如下。

建立一個RabbitMQ struct及以下方法。

  • Connect - 建立連線,若連線中斷會重新建立。把建立連線的邏輯放在for loop中來達到重建連線的效果,利用Connection.NotifyClose進行連線時的阻塞及監聽連線關閉。連線成功設RabbitMQ.IsReady為true,連線中斷時則設為false。
  • Consume - 連線成功後消費Rabbit MQ佇列中的訊息。把註冊消費者的邏輯放在for loop中令連線重建時可以使用新連線的Channel重新註冊。只有在連線成功RabbitMQ.IsReady為true時才會執行註冊消費者的邏輯。

mq/rabbitmq.go

package mq

import (
    "fmt"
    "log"
    "time"

    amqp "github.com/rabbitmq/amqp091-go"
)

type RabbitMQ struct {
    mqurl   string
    conn    *amqp.Connection
    channel *amqp.Channel
    IsReady bool
}

func NewRabbitMQ(mqurl string) RabbitMQ {
    return RabbitMQ{
        mqurl: mqurl,
    }
}

func (r *RabbitMQ) Connect(retryDelay time.Duration) {
    go func() {
        for {
            fmt.Println("attempting to connect")
            conn, err := amqp.Dial(r.mqurl)
            if err != nil {
                fmt.Println("failed to connect, retrying...")
                time.Sleep(retryDelay)
                continue
            }
            r.conn = conn

            for {
                fmt.Println("attempting to open channel")
                ch, err := r.conn.Channel()
                if err != nil {
                    fmt.Println("failed to open channel, retrying...")
                    time.Sleep(retryDelay)
                    continue
                }
                r.channel = ch
                r.IsReady = true

                <-r.conn.NotifyClose(make(chan *amqp.Error))
                r.IsReady = false
                fmt.Println("connection closed, reconnecting...")
                break
            }
        }
    }()
}

func (r *RabbitMQ) Consume() (err error) {
    for {
        if r.IsReady {
            msgs, err := r.channel.Consume(
                "hello",          // queue name
                "hello-consumer", // consumer
                true,             // auto-ack
                false,            // exclusive
                false,            // no-local
                false,            // no-wait
                nil,              // args
            )
            if err != nil {
                fmt.Println("failed to register a consumer, retrying...")
                continue
            }

            for d := range msgs {
                log.Printf("received a message: %s\n", d.Body)
            }
        }
    }
}

main.go建立RabbitMQ物件進行連線及消費訊息。

main.go

package main

import (
    "time"

    "abc.com/demo/mq"
)

func main() {
    r := mq.NewRabbitMQ("amqp://guest:guest@localhost:5672/")
    r.Connect(time.Second * 5)
    r.Consume()
}

github


沒有留言:

張貼留言