好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

SpringBoot整合RocketMQ实现消息发送和接收的详细步骤

我们使用主流的SpringBoot框架整合RocketMQ来讲解,使用方便快捷;

最终项目结构如下:

具体步骤如下:

第一步:新建SpringBoot项目 rocketmq-test ,引入rocketmq依赖,以及项目配置

?

1

2

3

4

5

<dependency>

     <groupId>org.apache.rocketmq</groupId>

     <artifactId>rocketmq-spring-boot-starter</artifactId>

     <version> 2.2 . 0 </version>

</dependency>

完整 pom.xml

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

<?xml version= "1.0" encoding= "UTF-8" ?>

<project xmlns= "http://maven.apache.org/POM/4.0.0" xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance"

          xsi:schemaLocation= "http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" >

     <modelVersion> 4.0 . 0 </modelVersion>

     <parent>

         <groupId>org.springframework.boot</groupId>

         <artifactId>spring-boot-starter-parent</artifactId>

         <version> 2.5 . 4 </version>

         <relativePath/> <!-- lookup parent from repository -->

     </parent>

     <groupId>com.java1234</groupId>

     <artifactId>rocketmq-test</artifactId>

     <version> 0.0 . 1 -SNAPSHOT</version>

     <name>rocketmq-test</name>

     <description>Demo project for Spring Boot</description>

     <properties>

         <java.version> 1.8 </java.version>

     </properties>

     <dependencies>

         <dependency>

             <groupId>org.springframework.boot</groupId>

             <artifactId>spring-boot-starter</artifactId>

         </dependency>

 

         <dependency>

             <groupId>org.apache.rocketmq</groupId>

             <artifactId>rocketmq-spring-boot-starter</artifactId>

             <version> 2.2 . 0 </version>

         </dependency>

        

     </dependencies>

 

     <build>

         <plugins>

             <plugin>

                 <groupId>org.springframework.boot</groupId>

                 <artifactId>spring-boot-maven-plugin</artifactId>

             </plugin>

         </plugins>

     </build>

 

</project>

另外我们项目配置文件需要指定nameserver地址,以及消息生产者和消息消费者组;

所以配置 application.yml :

?

1

2

3

4

5

6

rocketmq:

   name-server: 127.0 . 0.1 : 9876

   producer:

     group: producer-demo1

   consumer:

     group: consumer-demo1

第二步:新建消息生产者Service类 ProducerService

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

package com.java1234.rocketmq;

 

import org.apache.rocketmq.spring.core.RocketMQTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

 

/**

  * 消息生产者

  * @author java1234_小锋

  * @site www.java1234.com

  * @company 南通小锋网络科技有限公司

  * @create 2021-08-22 22:16

  */

@Component ( "producerService" )

public class ProducerService {

 

     @Autowired

     private RocketMQTemplate rocketMQTemplate;

 

     /**

      * 发送简单消息

      */

     public void sendMessage(){

         for ( int i= 0 ;i< 1000 ;i++){

             rocketMQTemplate.convertAndSend( "java1234-rocketmq" , "rocketmq大爷,你好!" +i);

         }

     }

}

SpringBoot给我们提供了 RocketMQTemplate 模板类,我们利用这个类可以以多种形式发送消息;

另外这个类我们要加下 @Component 注解,让Spring来管理实例,方便其他地方获取bean来使用;

发送方法指定Topic主题 java1234-rocketmq ;

第三步:启动类获取 ProducerService 实例,调用发送消息方法

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

package com.java1234;

 

import com.java1234.rocketmq.ProducerService;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.context.ConfigurableApplicationContext;

 

@SpringBootApplication

public class RocketmqTestApplication {

 

     public static void main(String[] args) {

         ConfigurableApplicationContext run = SpringApplication.run(RocketmqTestApplication. class , args);

         ProducerService producerService = (ProducerService) run.getBean( "producerService" );

         producerService.sendMessage();

     }

 

}

我们获取 ProducerService 实例,调用 sendMessage 方法发送消息;

第四步:新建消息消费者Service类 ConsumerService ,监听消息,消费消息

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

package com.java1234.rocketmq;

 

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;

import org.apache.rocketmq.spring.core.RocketMQListener;

import org.springframework.stereotype.Component;

 

/**

  * 消息消费者

  * @author java1234_小锋

  * @site www.java1234.com

  * @company 南通小锋网络科技有限公司

  * @create 2021-08-22 22:40

  */

@RocketMQMessageListener (topic = "java1234-rocketmq" ,consumerGroup = "${rocketmq.consumer.group}" )

@Component

public class ConsumerService implements RocketMQListener<String> {

 

     @Override

     public void onMessage(String s) {

         System.out.println( "收到消息内容:" +s);

     }

}

消费者类要实现 RocketMQListener 接口,以及动态指定消息类型String。

类上要加上**@RocketMQMessageListener**注解,指定topic主题 java1234-rocketmq ,以及消费者组 ${rocketmq.consumer.group}

同样这个类上也要加上 @Component 注解,让Spring来管理bean实例;

第五步:再次启动,测试消息消费

测试OK,成功消费!

到此这篇关于SpringBoot整合RocketMQ实现消息发送和接收的文章就介绍到这了,更多相关SpringBoot整合RocketMQ消息内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

原文链接:https://blog.csdn.net/caoli201314/article/details/119900570

查看更多关于SpringBoot整合RocketMQ实现消息发送和接收的详细步骤的详细内容...

  阅读:22次