好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

SpringBoot整合RabbitMQ处理死信队列和延迟队列

简介

说明

本文用示例介绍SpringBoot整合RabbitMQ时如何处理死信队列/延迟队列。

RabbitMQ消息简介

RabbitMQ的消息默认不会超时。 

什么是死信队列?什么是延迟队列?

死信队列:

DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列。

以下几种情况会导致消息变成死信:

消息被拒绝(Basic.Reject/Basic.Nack),并且设置requeue参数为false; 消息过期; 队列达到最大长度。

延迟队列:

延迟队列用来存放延迟消息。延迟消息:指当消息被发送以后,不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

相关网址

详解RabbitMQ中死信队列和延迟队列的使用详解

实例代码

路由配置

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

package com.example.config;

 

import org.springframework.amqp.core.*;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

 

@Configuration

public class RabbitRouterConfig {

     public static final String EXCHANGE_TOPIC_WELCOME   = "Exchange@topic.welcome" ;

     public static final String EXCHANGE_FANOUT_UNROUTE  = "Exchange@fanout.unroute" ;

     public static final String EXCHANGE_TOPIC_DELAY     = "Exchange@topic.delay" ;

 

     public static final String ROUTINGKEY_HELLOS        = "hello.#" ;

     public static final String ROUTINGKEY_DELAY         = "delay.#" ;

 

     public static final String QUEUE_HELLO              = "Queue@hello" ;

     public static final String QUEUE_HI                 = "Queue@hi" ;

     public static final String QUEUE_UNROUTE            = "Queue@unroute" ;

     public static final String QUEUE_DELAY              = "Queue@delay" ;

 

     public static final Integer TTL_QUEUE_MESSAGE       = 5000 ;

 

     @Autowired

     AmqpAdmin amqpAdmin;

 

     @Bean

     Object initBindingTest() {

         amqpAdmin.declareExchange(ExchangeBuilder.fanoutExchange(EXCHANGE_FANOUT_UNROUTE).durable( true ).autoDelete().build());

         amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_DELAY).durable( true ).autoDelete().build());

         amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_WELCOME)

                 .durable( true )

                 .autoDelete()

                 .withArgument( "alternate-exchange" , EXCHANGE_FANOUT_UNROUTE)

 

                 .build());

 

         amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HI).build());

         amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HELLO)

                 .withArgument( "x-dead-letter-exchange" , EXCHANGE_TOPIC_DELAY)

                 .withArgument( "x-dead-letter-routing-key" , ROUTINGKEY_DELAY)

                 .withArgument( "x-message-ttl" , TTL_QUEUE_MESSAGE)

                 .build());

         amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_UNROUTE).build());

         amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_DELAY).build());

 

         amqpAdmin.declareBinding( new Binding(QUEUE_HELLO, Binding.DestinationType.QUEUE,

                 EXCHANGE_TOPIC_WELCOME, ROUTINGKEY_HELLOS, null ));

         amqpAdmin.declareBinding( new Binding(QUEUE_UNROUTE, Binding.DestinationType.QUEUE,

                 EXCHANGE_FANOUT_UNROUTE, "" , null ));

         amqpAdmin.declareBinding( new Binding(QUEUE_DELAY, Binding.DestinationType.QUEUE,

                 EXCHANGE_TOPIC_DELAY, ROUTINGKEY_DELAY, null ));

 

         return new Object();

     }

}

控制器

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

package com.example.controller;

 

import com.example.config.RabbitRouterConfig;

import com.example.mq.Sender;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.PostMapping;

import org.springframework.web.bind.annotation.RestController;

 

import java.time.LocalDateTime;

 

@RestController

public class HelloController {

     @Autowired

     private Sender sender;

 

     @PostMapping ( "/hi" )

     public void hi() {

         sender.send(RabbitRouterConfig.QUEUE_HI, "hi1 message:" + LocalDateTime.now());

     }

 

     @PostMapping ( "/hello1" )

     public void hello1() {

         sender.send( "hello.a" , "hello1 message:" + LocalDateTime.now());

     }

 

     @PostMapping ( "/hello2" )

     public void hello2() {

         sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "hello.b" , "hello2 message:" + LocalDateTime.now());

     }

 

     @PostMapping ( "/ae" )

     public void aeTest() {

         sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "nonono" , "ae message:" + LocalDateTime.now());

     }

}

发送器

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

package com.example.mq;

 

import org.springframework.amqp.core.AmqpTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

 

import java.util.Date;

 

@Component

public class Sender {

     @Autowired

     private AmqpTemplate rabbitTemplate;

 

     public void send(String routingKey, String message) {

         this .rabbitTemplate.convertAndSend(routingKey, message);

     }

 

     public void send(String exchange, String routingKey, String message) {

         this .rabbitTemplate.convertAndSend(exchange, routingKey, message);

     }

}

接收器

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

package com.example.mq;

 

import com.example.config.RabbitRouterConfig;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

 

@Component

public class Receiver {

     @RabbitListener (queues = RabbitRouterConfig.QUEUE_HI)

     public void hi(String payload) {

         System.out.println ( "Receiver(hi) : "   + payload);

     }

 

     // @RabbitListener(queues = RabbitRouterConfig.QUEUE_HELLO)

     // public void hello(String hello) throws InterruptedException {

     //     System.out.println ("Receiver(hello) : "  + hello);

     //     Thread.sleep(5 * 1000);

     //     System.out.println("(hello):sleep over");

     // }

     //

     // @RabbitListener(queues = RabbitRouterConfig.QUEUE_UNROUTE)

     // public void unroute(String hello) throws InterruptedException {

     //     System.out.println ("Receiver(unroute) : "  + hello);

     //     Thread.sleep(5 * 1000);

     //     System.out.println("(unroute):sleep over");

     // }

 

     @RabbitListener (queues = RabbitRouterConfig.QUEUE_DELAY)

     public void delay(String hello) throws InterruptedException {

         System.out.println ( "Receiver(delay) : "   + hello);

         Thread.sleep( 5 * 1000 );

         System.out.println( "(delay):sleep over" );

     }

}

application.yml

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

server:

#  port: 9100

   port: 9101

spring:

   application:

#    name: demo-rabbitmq-sender

     name: demo-rabbitmq-receiver

   rabbitmq:

     host: localhost

     port: 5672

     username: admin

     password: 123456

#    virtualHost: /

     publisher-confirms: true

     publisher-returns: true

#    listener:

#      simple:

#        acknowledge-mode: manual

#      direct:

#        acknowledge-mode: manual

实例测试

分别启动发送者和接收者。

访问:http://localhost:9100/hello2

五秒钟后输出:

Receiver(delay) : hello2 message:2020-11-27T09:30:51.548
(delay):sleep over

以上就是SpringBoot整合RabbitMQ处理死信队列和延迟队列的详细内容,更多关于SpringBoot RabbitMQ死信队列 延迟队列的资料请关注其它相关文章!

原文链接:https://blog.csdn.net/feiying0canglang/article/details/124955698

查看更多关于SpringBoot整合RabbitMQ处理死信队列和延迟队列的详细内容...

  阅读:13次