本範例參考Spring官網Messaging with RabbitMQ,示範Spring Boot RabbitMQ的基本配置與傳送/接收訊息。
範例環境:
- Spring Boot 2.3.2.RELEASE
- Maven
- Docker 19.03.12
RabbitMQ是一個訊息中介(Message Broker)伺服器,多用於分散式系統架構及微服務間的訊息溝通,藉此達到非同步溝通(asynchronous communication)的好處。
本範例建立一個Spring Boot服務並向RabbitMQ發送訊息及接收訊息。
首先在Docker上執行RabbitMQ server。在專案根目錄(其他目錄也可以)建立docker-compose.yml
內容如下。
docker-compose.yml
rabbitmq:
image: rabbitmq:management
ports:
- "5672:5672"
- "15672:15672"
rabbitmq:management
所下載的RabbitMQ image是包含Management Plugin的版本。
5672
是RabbitMQ AMQP使用的port;
15672
是RabbitMQ管理頁面入口port。
在docker-compose.yml
所在目錄以命令列輸入docker-compose up -d來
依照docker-compose.yml
的配置下載RabbitMQ的docker image並以背景運行在docker container。
$ docker-compose up -d
Pulling rabbitmq (rabbitmq:management)...
management: Pulling from library/rabbitmq
...
Creating spring-boot-demo_rabbitmq_1 ... done
輸入docker ps
檢視運行中的RabbitMQ container。
o$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
1c4826f35dbe rabbitmq:management "docker-entrypoint.s…" 7 seconds ago Up 6 seconds 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp spring-boot-demo_rabbitmq_1
輸入docker exec -it spring-boot-demo_rabbitmq_1 rabbitmqctl version
查看RabbitMQ版本。
spring-boot-demo_rabbitmq_1
為RabbitMQ docker container的名稱。
rabbitmqctl
為RabbitMQ的指令工具,後接version
可查看版本資訊。
$ docker exec -it spring-boot-demo_rabbitmq_1 rabbitmqctl version
3.8.7
建立一個Spring Boot專案,在pom.xml
加入Spring Boot RabbitMQ的依賴如下。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
範例的pom.xml
。
建立Receiver
負責接收訊息。此類別會在下面的RabbitMqConfig
配置中設為MessageListenerAdapter
的委派處理器。
Receiver
package com.abc.demo.mq.receiver;
import org.springframework.stereotype.Component;
import java.util.concurrent.CountDownLatch;
@Component
public class Receiver {
private final CountDownLatch latch = new CountDownLatch(1);
private String message;
public void receiveMessage(String message) {
System.out.println("Received message:" + message);
this.message = message;
latch.countDown();
}
public String getMessage() {
return message;
}
public CountDownLatch getLatch() {
return latch;
}
}
CountDownLatch
的用途為使執行緒等待及解除等待。初始數設為1,接收訊息後呼叫countDown()
扣1,當數為0時則等待的執行緒結束等待並繼續執行。
建立類別RabbitMqConfig
配置RabbitMQ的queue,exchange,綁定queue與exchange的binding,向MessageListenerContainer
註冊MessageListenerAdapter
。ConnectionFactory
的實例由Spring Boot自動配置。
RabbitMqConfig
package com.abc.demo.mq.config;
import com.abc.demo.mq.receiver.Receiver;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
public static final String TOPIC_EXCHANGE_NAME = "demo-exchange";
public static final String QUEUE_NAME = "demo-queue";
@Bean
Queue queue() {
return new Queue(QUEUE_NAME, false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(TOPIC_EXCHANGE_NAME);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("demo-routing-key");
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
@Bean
SimpleMessageListenerContainer container(
ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(QUEUE_NAME);
container.setMessageListener(listenerAdapter);
return container;
}
}
建立MessageService
,注入RabbitTemplate
來發送訊息;注入Receiver
取得接收訊息。
MessageService
package com.abc.demo.service;
import com.abc.demo.mq.config.RabbitMqConfig;
import com.abc.demo.mq.receiver.Receiver;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class MessageService {
private final RabbitTemplate rabbitTemplate;
private final Receiver receiver;
public MessageService(Receiver receiver, RabbitTemplate rabbitTemplate) {
this.receiver = receiver;
this.rabbitTemplate = rabbitTemplate;
// 菜鳥工程師肉豬
}
public String send(String message) throws Exception {
System.out.println("Sending message:" + message);
rabbitTemplate.convertAndSend(
RabbitMqConfig.TOPIC_EXCHANGE_NAME,
"demo-routing-key",
message);
receiver.getLatch().await(1, TimeUnit.SECONDS);
return receiver.getMessage();
}
}
建立DemoController
以REST API測試。test()
中呼叫MessageService.send()
並傳入路徑變數。
DemoController
package com.abc.demo.controller;
import com.abc.demo.service.MessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class DemoController {
@Autowired
private MessageService messageService;
@GetMapping("/test/{message}")
public String test(@PathVariable String message) throws Exception {
return messageService.send(message);
}
}
啟動專案,使用Postman呼叫API GET | http://localhost:8080/demo/test/helloworld
。
在console會印出下面訊息
Sending message:helloworld
Received message:helloworld
Postman的回應結果如下
參考github。
沒有留言:
張貼留言