1. 概述
老话说的好:做人要懂得变通,善于思考,有时稍微转个弯,也许问题就解决了。
言归正传,之前我们聊了 RabbitMQ 3.9.7 镜像模式集群的搭建,今天我们来聊聊 RabbitMQ 3.9.7 镜像模式集群与Springboot 2.5.5 整合。
2. 场景说明
服务器A IP:192.168.1.22
服务器B IP:192.168.1.8
服务器C IP:192.168.1.144
此三台服务器上已搭建好了 RabbitMQ镜像模式集群,镜像模式集群的搭建,可参见我的上一篇文章。
3. 与Springboot的整合
3.1 引入依赖
1 2 3 4 5 6 |
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version> 2.5 . 5 </version> <relativePath/> <!-- lookup parent from repository --> </parent> |
1 2 3 4 |
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> |
3.2 生产服务配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
spring: rabbitmq: addresses: 192.168 . 1.22 : 5672 , 192.168 . 1.8 : 5672 , 192.168 . 1.144 : 5672 username: guest password: guest virtual-host: / connection-timeout: 16000
# 启用消息确认模式 publisher-confirm-type: correlated
# 启用 return 消息模式 publisher-returns: true template: mandatory: true |
3.3 生产服务代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component;
import java.util.Map;
@Component public class Producer {
@Autowired private RabbitTemplate rabbitTemplate;
/** * 确认回调 */ final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { // correlationData 唯一标识 // ack mq是否收到消息 // cause 失败原因 System.out.println( "correlationData:" + correlationData.getId()); System.out.println( "ack:" + ack); System.out.println( "cause:" + cause);
} };
/** * 发送消息 * @param messageBody 消息体 * @param headers 附加属性 * @throws Exception */ public void sendMessage(String messageBody, Map<String, Object> headers, String id) throws Exception {
MessageHeaders messageHeaders = new MessageHeaders(headers);
Message<String> message = MessageBuilder.createMessage(messageBody, messageHeaders);
rabbitTemplate.setConfirmCallback(confirmCallback);
String exchangeName = "exchange-hello" ; String routingKey = "test.123" ;
CorrelationData correlationData = new CorrelationData(id);
rabbitTemplate.convertAndSend(exchangeName, routingKey, message, new MessagePostProcessor() {
/** * 发送消息后做的事情 * @param message * @return * @throws AmqpException */ @Override public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException { return message; } }, correlationData); } } |
3.4 消费服务配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
spring: rabbitmq: addresses: 192.168 . 1.22 : 5672 , 192.168 . 1.8 : 5672 , 192.168 . 1.144 : 5672 username: guest password: guest virtual-host: / connection-timeout: 16000
listener: simple: # 设置为手工ACK acknowledge-mode: manual concurrency: 5 prefetch: 1 max-concurrency: 10 |
3.5 消费服务代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.Message; import org.springframework.stereotype.Component;
@Component public class Consumer {
@RabbitListener (bindings = @QueueBinding ( value = @Queue (value = "queue-hello" , durable = "true" ), exchange = @Exchange (value = "exchange-hello" , durable = "true" , type = "topic" ), key = "test.*" )) @RabbitHandler public void onMessage(Message message, Channel channel) throws Exception {
System.out.println( "收到消息:" + message.getPayload());
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); channel.basicAck(deliveryTag, false ); } } |
3.6 Rest 测试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
@RestController @RequestMapping ( "/mq" ) public class RabbitmqController {
@Autowired private Producer producer;
@GetMapping ( "/sendMessage" ) public String sendMessage( @RequestParam String messageBody, @RequestParam String id) throws Exception { Map<String, Object> headers = new HashMap<>(); producer.sendMessage(messageBody, headers, id); return "success" ; } } |
4. 综述
到此这篇关于RabbitMQ 3.9.7 镜像模式集群与Springboot 2.5.5 整合的文章就介绍到这了,更多相关RabbitMQ镜像模式集群内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
原文链接:https://www.cnblogs.com/w84422/p/15361662.html
查看更多关于RabbitMQ 3.9.7 镜像模式集群与Springboot 2.5.5 整合的详细内容...