好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

RabbitMQ 3.9.7 镜像模式集群与Springboot 2.5.5 整合

1. 概述

老话说的好:做人要懂得变通,善于思考,有时稍微转个弯,也许问题就解决了。

言归正传,之前我们聊了 RabbitMQ 3.9.7 镜像模式集群的搭建,今天我们来聊聊 RabbitMQ 3.9.7 镜像模式集群与Springboot 2.5.5 整合。

2. 场景说明

服务器A IP:192.168.1.22

服务器B IP:192.168.1.8

服务器C IP:192.168.1.144

此三台服务器上已搭建好了 RabbitMQ镜像模式集群,镜像模式集群的搭建,可参见我的上一篇文章。

3. 与Springboot的整合

3.1 引入依赖

?

1

2

3

4

5

6

<parent>

         <groupId>org.springframework.boot</groupId>

         <artifactId>spring-boot-starter-parent</artifactId>

         <version> 2.5 . 5 </version>

         <relativePath/> <!-- lookup parent from repository -->

     </parent>

?

1

2

3

4

<dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-amqp</artifactId>

        </dependency>

3.2 生产服务配置

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

spring:

   rabbitmq:

     addresses: 192.168 . 1.22 : 5672 , 192.168 . 1.8 : 5672 , 192.168 . 1.144 : 5672

     username: guest

     password: guest

     virtual-host: /

     connection-timeout: 16000

 

     # 启用消息确认模式

     publisher-confirm-type: correlated

 

     # 启用 return 消息模式

     publisher-returns: true

     template:

       mandatory: true

3.3 生产服务代码

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

import org.springframework.amqp.AmqpException;

import org.springframework.amqp.core.MessagePostProcessor;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.messaging.Message;

import org.springframework.messaging.MessageHeaders;

import org.springframework.messaging.support.MessageBuilder;

import org.springframework.stereotype.Component;

 

import java.util.Map;

 

@Component

public class Producer {

 

     @Autowired

     private RabbitTemplate rabbitTemplate;

 

     /**

      * 确认回调

      */

     final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {

 

         @Override

         public void confirm(CorrelationData correlationData, boolean ack, String cause) {

             // correlationData 唯一标识

             // ack mq是否收到消息

             // cause 失败原因

             System.out.println( "correlationData:" + correlationData.getId());

             System.out.println( "ack:" + ack);

             System.out.println( "cause:" + cause);

 

         }

     };

 

     /**

      * 发送消息

      * @param messageBody   消息体

      * @param headers       附加属性

      * @throws Exception

      */

     public void sendMessage(String messageBody, Map<String, Object> headers, String id) throws Exception {

 

         MessageHeaders messageHeaders = new MessageHeaders(headers);

 

         Message<String> message = MessageBuilder.createMessage(messageBody, messageHeaders);

 

         rabbitTemplate.setConfirmCallback(confirmCallback);

 

         String exchangeName = "exchange-hello" ;

         String routingKey = "test.123" ;

 

         CorrelationData correlationData = new CorrelationData(id);

 

         rabbitTemplate.convertAndSend(exchangeName, routingKey, message, new MessagePostProcessor() {

 

             /**

              * 发送消息后做的事情

              * @param message

              * @return

              * @throws AmqpException

              */

             @Override

             public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException {

                 return message;

             }

         }, correlationData);

     }

}

3.4 消费服务配置

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

spring:

   rabbitmq:

     addresses: 192.168 . 1.22 : 5672 , 192.168 . 1.8 : 5672 , 192.168 . 1.144 : 5672

     username: guest

     password: guest

     virtual-host: /

     connection-timeout: 16000

 

     listener:

       simple:

         # 设置为手工ACK

         acknowledge-mode: manual

         concurrency: 5

         prefetch: 1

         max-concurrency: 10

3.5 消费服务代码

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

import com.rabbitmq.client.Channel;

import org.springframework.amqp.rabbit.annotation.*;

import org.springframework.amqp.support.AmqpHeaders;

import org.springframework.messaging.Message;

import org.springframework.stereotype.Component;

 

@Component

public class Consumer {

 

     @RabbitListener (bindings = @QueueBinding (

             value = @Queue (value = "queue-hello" , durable = "true" ),

             exchange = @Exchange (value = "exchange-hello" , durable = "true" , type = "topic" ),

             key = "test.*"

     ))

     @RabbitHandler

     public void onMessage(Message message, Channel channel) throws Exception {

 

         System.out.println( "收到消息:" + message.getPayload());

 

         Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);

         channel.basicAck(deliveryTag, false );

     }

}

3.6 Rest 测试代码

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

@RestController

@RequestMapping ( "/mq" )

public class RabbitmqController {

 

     @Autowired

     private Producer producer;

 

     @GetMapping ( "/sendMessage" )

     public String sendMessage( @RequestParam String messageBody, @RequestParam String id) throws Exception {

         Map<String, Object> headers = new HashMap<>();

         producer.sendMessage(messageBody, headers, id);

         return "success" ;

     }

}

4. 综述

到此这篇关于RabbitMQ 3.9.7 镜像模式集群与Springboot 2.5.5 整合的文章就介绍到这了,更多相关RabbitMQ镜像模式集群内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

原文链接:https://www.cnblogs.com/w84422/p/15361662.html

查看更多关于RabbitMQ 3.9.7 镜像模式集群与Springboot 2.5.5 整合的详细内容...

  阅读:17次