好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Springboot 配置RabbitMQ文档的方法步骤

简介

rabbitmq是实现amqp(高级消息队列协议)的消息中间件的一种,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗

概念:

生产者 消息的产生方,负责将消息推送到消息队列 消费者 消息的最终接受方,负责监听队列中的对应消息,消费消息 队列 消息的寄存器,负责存放生产者发送的消息 交换机 负责根据一定规则分发生产者产生的消息 绑定 完成交换机和队列之间的绑定

模式:

direct:直连模式,用于实例间的任务分发 topic:话题模式,通过可配置的规则分发给绑定在该exchange上的队列 headers:适用规则复杂的分发,用headers里的参数表达规则 fanout:分发给所有绑定到该exchange上的队列,忽略routing key

springboot集成rabbitmq

一、引入maven依赖

?

1

2

3

4

5

<dependency>

  <groupid>org.springframework.boot</groupid>

  <artifactid>spring-boot-starter-amqp</artifactid>

  <version> 1.5 . 2 .release</version>

</dependency>

二、配置application.properties

?

1

2

3

4

5

6

# rabbitmq

spring.rabbitmq.host = dev-mq.a.pa测试数据

spring.rabbitmq.port = 5672

spring.rabbitmq.username = admin

spring.rabbitmq.password = admin

spring.rabbitmq.virtualhost = /message-test/

三、编写amqpconfiguration配置文件

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

package message.test.configuration;

import org.springframework.amqp.core.acknowledgemode;

import org.springframework.amqp.core.amqptemplate;

import org.springframework.amqp.core.binding;

import org.springframework.amqp.core.bindingbuilder;

import org.springframework.amqp.core.directexchange;

import org.springframework.amqp.core.queue;

import org.springframework.amqp.rabbit.config.simplerabbitlistenercontainerfactory;

import org.springframework.amqp.rabbit.connection.cachingconnectionfactory;

import org.springframework.amqp.rabbit.connection.connectionfactory;

import org.springframework.amqp.rabbit.core.rabbittemplate;

import org.springframework.beans.factory.annotation.autowired;

import org.springframework.beans.factory.annotation.qualifier;

import org.springframework.boot.autoconfigure.amqp.rabbitproperties;

import org.springframework.context.annotation.bean;

import org.springframework.context.annotation.configuration;

 

@configuration

public class amqpconfiguration {

/**

  * 消息编码

  */

  public static final string message_encoding = "utf-8" ;

  public static final string exchange_issue = "exchange_message_issue" ;

  public static final string queue_issue_user = "queue_message_issue_user" ;

  public static final string queue_issue_all_user = "queue_message_issue_all_user" ;

  public static final string queue_issue_all_device = "queue_message_issue_all_device" ;

  public static final string queue_issue_city = "queue_message_issue_city" ;

  public static final string routing_key_issue_user = "routing_key_message_issue_user" ;

  public static final string routing_key_issue_all_user = "routing_key_message_issue_all_user" ;

  public static final string routing_key_issue_all_device = "routing_key_message_issue_all_device" ;

  public static final string routing_key_issue_city = "routing_key_message_issue_city" ;

  public static final string exchange_push = "exchange_message_push" ;

  public static final string queue_push_result = "queue_message_push_result" ;

 

  @autowired

  private rabbitproperties rabbitproperties;

 

  @bean

  public queue issueuserqueue() {

   return new queue(queue_issue_user);

  }

 

  @bean

  public queue issuealluserqueue() {

   return new queue(queue_issue_all_user);

  }

 

  @bean

  public queue issuealldevicequeue() {

   return new queue(queue_issue_all_device);

  }

 

  @bean

  public queue issuecityqueue() {

   return new queue(queue_issue_city);

  }

 

  @bean

  public queue pushresultqueue() {

   return new queue(queue_push_result);

  }

 

  @bean

  public directexchange issueexchange() {

   return new directexchange(exchange_issue);

  }

 

  @bean

  public directexchange pushexchange() {

   // 参数1:队列

   // 参数2:是否持久化

   // 参数3:是否自动删除

   return new directexchange(exchange_push, true , true );

  }

 

  @bean

  public binding issueuserqueuebinding( @qualifier ( "issueuserqueue" ) queue queue,

     @qualifier ( "issueexchange" ) directexchange exchange) {

    return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_user);

  }

 

  @bean

  public binding issuealluserqueuebinding( @qualifier ( "issuealluserqueue" ) queue queue,

     @qualifier ( "issueexchange" ) directexchange exchange) {

   return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_all_user);

  }

 

  @bean

  public binding issuealldevicequeuebinding( @qualifier ( "issuealldevicequeue" ) queue queue,

     @qualifier ( "issueexchange" ) directexchange exchange) {

   return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_all_device);

  }

 

  @bean

  public binding issuecityqueuebinding( @qualifier ( "issuecityqueue" ) queue queue,

     @qualifier ( "issueexchange" ) directexchange exchange) {

   return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_city);

  }

 

  @bean

  public binding pushresultqueuebinding( @qualifier ( "pushresultqueue" ) queue queue,

     @qualifier ( "pushexchange" ) directexchange exchange) {

   return bindingbuilder.bind(queue).to(exchange).withqueuename();

  }

 

  @bean

  public connectionfactory defaultconnectionfactory() {

   cachingconnectionfactory connectionfactory = new cachingconnectionfactory();

   connectionfactory.sethost(rabbitproperties.gethost());

   connectionfactory.setport(rabbitproperties.getport());

   connectionfactory.setusername(rabbitproperties.getusername());

   connectionfactory.setpassword(rabbitproperties.getpassword());

   connectionfactory.setvirtualhost(rabbitproperties.getvirtualhost());

   return connectionfactory;

  }

 

  @bean

  public simplerabbitlistenercontainerfactory rabbitlistenercontainerfactory(

     @qualifier ( "defaultconnectionfactory" ) connectionfactory connectionfactory) {

   simplerabbitlistenercontainerfactory factory = new simplerabbitlistenercontainerfactory();

   factory.setconnectionfactory(connectionfactory);

   factory.setacknowledgemode(acknowledgemode.manual);

   return factory;

  }

 

  @bean

  public amqptemplate rabbittemplate( @qualifier ( "defaultconnectionfactory" ) connectionfactory connectionfactory)

  {

   return new rabbittemplate(connectionfactory);

  }

}

三、编写生产者

?

1

2

3

body = json.tojsonstring(issuemessage).getbytes(amqpconfiguration.message_encoding);

  rabbittemplate.convertandsend(amqpconfiguration.exchange_issue,

             amqpconfiguration.routing_key_issue_user, body);

四、编写消费者

?

1

2

3

4

5

@rabbitlistener (queues = amqpconfiguration.queue_push_result)

public void handlepushresult( @payload byte [] data, channel channel,

     @header (amqpheaders.delivery_tag) long deliverytag) {

    

}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

原文链接:https://segmentfault测试数据/a/1190000018555963

查看更多关于Springboot 配置RabbitMQ文档的方法步骤的详细内容...

  阅读:12次