網頁

2020/8/30

Spring Boot Messaging with RabbitMQ 範例

本範例參考Spring官網Messaging with RabbitMQ,示範Spring Boot RabbitMQ的基本配置與傳送/接收訊息。


範例環境:

  • Spring Boot 2.3.2.RELEASE
  • Maven
  • Docker 19.03.12

RabbitMQ是一個訊息中介(Message Broker)伺服器,多用於分散式系統架構及微服務間的訊息溝通,藉此達到非同步溝通(asynchronous communication)的好處。


本範例建立一個Spring Boot服務並向RabbitMQ發送訊息及接收訊息。


首先在Docker上執行RabbitMQ server。在專案根目錄(其他目錄也可以)建立docker-compose.yml內容如下。

docker-compose.yml

rabbitmq:
  image: rabbitmq:management
  ports:
    - "5672:5672"
    - "15672:15672"

rabbitmq:management所下載的RabbitMQ image是包含Management Plugin的版本。
5672是RabbitMQ AMQP使用的port;
15672是RabbitMQ管理頁面入口port。

docker-compose.yml所在目錄以命令列輸入docker-compose up -d依照docker-compose.yml的配置下載RabbitMQ的docker image並以背景運行在docker container。

$ docker-compose up -d
Pulling rabbitmq (rabbitmq:management)...
management: Pulling from library/rabbitmq
...
Creating spring-boot-demo_rabbitmq_1 ... done

輸入docker ps檢視運行中的RabbitMQ container。

o$ docker ps
CONTAINER ID        IMAGE                 COMMAND                  CREATED             STATUS              PORTS                                                                                                         NAMES
1c4826f35dbe        rabbitmq:management   "docker-entrypoint.s…"   7 seconds ago       Up 6 seconds        4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp   spring-boot-demo_rabbitmq_1

輸入docker exec -it spring-boot-demo_rabbitmq_1 rabbitmqctl version查看RabbitMQ版本。
spring-boot-demo_rabbitmq_1為RabbitMQ docker container的名稱。
rabbitmqctl為RabbitMQ的指令工具,後接version可查看版本資訊。

$ docker exec -it spring-boot-demo_rabbitmq_1 rabbitmqctl version
3.8.7


建立一個Spring Boot專案,在pom.xml加入Spring Boot RabbitMQ的依賴如下。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>        

範例的pom.xml


建立Receiver負責接收訊息。此類別會在下面的RabbitMqConfig配置中設為MessageListenerAdapter的委派處理器。

Receiver

package com.abc.demo.mq.receiver;

import org.springframework.stereotype.Component;

import java.util.concurrent.CountDownLatch;

@Component
public class Receiver {

    private final CountDownLatch latch = new CountDownLatch(1);

    private String message;

    public void receiveMessage(String message) {
        System.out.println("Received message:" + message);
        this.message = message;
        latch.countDown();
    }

    public String getMessage() {
        return message;
    }

    public CountDownLatch getLatch() {
        return latch;
    }

}

CountDownLatch的用途為使執行緒等待及解除等待。初始數設為1,接收訊息後呼叫countDown()扣1,當數為0時則等待的執行緒結束等待並繼續執行。


建立類別RabbitMqConfig配置RabbitMQ的queue,exchange,綁定queue與exchange的binding,向MessageListenerContainer註冊MessageListenerAdapterConnectionFactory的實例由Spring Boot自動配置。

RabbitMqConfig

package com.abc.demo.mq.config;

import com.abc.demo.mq.receiver.Receiver;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {

    public static final String TOPIC_EXCHANGE_NAME = "demo-exchange";

    public static final String QUEUE_NAME = "demo-queue";

    @Bean
    Queue queue() {
        return new Queue(QUEUE_NAME, false);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(TOPIC_EXCHANGE_NAME);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("demo-routing-key");
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

    @Bean
    SimpleMessageListenerContainer container(
            ConnectionFactory connectionFactory,
            MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(QUEUE_NAME);
        container.setMessageListener(listenerAdapter);
        return container;
    }
}

建立MessageService,注入RabbitTemplate來發送訊息;注入Receiver取得接收訊息。

MessageService

package com.abc.demo.service;

import com.abc.demo.mq.config.RabbitMqConfig;
import com.abc.demo.mq.receiver.Receiver;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

@Service
public class MessageService {

    private final RabbitTemplate rabbitTemplate;
    private final Receiver receiver;

    public MessageService(Receiver receiver, RabbitTemplate rabbitTemplate) {
        this.receiver = receiver;
        this.rabbitTemplate = rabbitTemplate;
        // 菜鳥工程師肉豬
    }

    public String send(String message) throws Exception {
        System.out.println("Sending message:" + message);
        rabbitTemplate.convertAndSend(
                RabbitMqConfig.TOPIC_EXCHANGE_NAME,
                "demo-routing-key",
                message);
        receiver.getLatch().await(1, TimeUnit.SECONDS);
        return receiver.getMessage();

    }

}

建立DemoController以REST API測試。test()中呼叫MessageService.send()並傳入路徑變數。

DemoController

package com.abc.demo.controller;

import com.abc.demo.service.MessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class DemoController {

    @Autowired
    private MessageService messageService;

    @GetMapping("/test/{message}")
    public String test(@PathVariable String message) throws Exception {
        return messageService.send(message);
    }

}

啟動專案,使用Postman呼叫API GET | http://localhost:8080/demo/test/helloworld

在console會印出下面訊息

Sending message:helloworld
Received message:helloworld

Postman的回應結果如下



參考github


沒有留言:

張貼留言