好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Springcloud整合stream,rabbitmq实现消息驱动功能

springcloud整合stream,rabbitmq实现消息驱动功能

1.代码实现:

创建项目stream

添加依赖

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

< parent >

        < groupId >org.springframework.boot</ groupId >

        < artifactId >spring-boot-starter-parent</ artifactId >

        < version >2.6.2</ version >

        < relativePath /> <!-- lookup parent from repository -->

    </ parent >

    < groupId >com.cxh</ groupId >

    < artifactId >stream</ artifactId >

    < version >0.0.1-SNAPSHOT</ version >

    < name >stream</ name >

    < description >Demo project for Spring Boot</ description >

    < properties >

        < java.version >8</ java.version >

        < spring-cloud-alibaba-dependencies.version >2021.1</ spring-cloud-alibaba-dependencies.version >

        < spring-cloud-dependencies.version >2021.0.0</ spring-cloud-dependencies.version >

    </ properties >

 

    < dependencyManagement >

        < dependencies >

            < dependency >

                < groupId >org.springframework.cloud</ groupId >

                < artifactId >spring-cloud-dependencies</ artifactId >

                < version >${spring-cloud-dependencies.version}</ version >

                < type >pom</ type >

                < scope >import</ scope >

            </ dependency >

            < dependency >

                < groupId >com.alibaba.cloud</ groupId >

                < artifactId >spring-cloud-alibaba-dependencies</ artifactId >

                < version >${spring-cloud-alibaba-dependencies.version}</ version >

                < type >pom</ type >

                < scope >import</ scope >

            </ dependency >

        </ dependencies >

    </ dependencyManagement >

    < dependencies >

        < dependency >

            < groupId >org.springframework.boot</ groupId >

            < artifactId >spring-boot-starter</ artifactId >

        </ dependency >

 

        < dependency >

            < groupId >org.springframework.boot</ groupId >

            < artifactId >spring-boot-starter-test</ artifactId >

            < scope >test</ scope >

        </ dependency >

 

        < dependency >

            < groupId >org.springframework.boot</ groupId >

            < artifactId >spring-boot-starter-web</ artifactId >

        </ dependency >

 

        < dependency >

            < groupId >org.springframework.cloud</ groupId >

            < artifactId >spring-cloud-starter-stream-rabbit</ artifactId >

        </ dependency >

    </ dependencies >

监听类

?

1

2

3

4

5

6

7

8

9

10

11

@EnableBinding (Sink. class )

public class SinkReceiver {

 

     private static Logger logger = LoggerFactory.getLogger(SinkReceiver. class );

 

     @StreamListener (Sink.INPUT)

     public void receive(String payload) {

         logger.info( "Received: " + payload);

     }

 

}

2.实现效果:

启动rabbitmq, 项目stream

打开浏览器http://localhost:15672/,使用账号密码guest登录rabbitmq, 在队列中发现消息:

测试send

查看控制台消息:

com.cxh.stream.SinkReceiver              : Received: 测试send

补充

Spring Cloud Stream专门用于事件驱动的微服务系统,使用消息中间件来收发信息。使用Spring Cloud Stream可专注于业务开发,而不用花太多心思在应用与MQ之间的交互上。而且,在切换MQ后,也无须做太多的代码改动。

所以Spring Cloud Stream和RabbitMQ还可以整合实现消息的收发

整合过程

添加依赖

?

1

2

3

4

< dependency >

   < groupId >org.springframework.cloud</ groupId >

   < artifactId >spring-cloud-stream-binder-rabbit</ artifactId >

</ dependency >

不同的MQ使用不同的依赖,非常容易切换。

定义处理收发的方法

队列无非就是收和发,所以我们要先定义好,怎么样发,怎么样收。

发送消息:

?

1

2

3

4

5

6

7

8

@Bean

public Supplier<String> pkslowSource() {

   return () -> {

     String message = "HdhCmsTestpkslow测试数据" ;

     log.info( "Sending value: " + message);

     return message;

   };

}

只发送一个 String ,一般业务通常为Entity类。这里发送的内容也固定不变,实际业务可以通过查数据库,读文件等方式获取数据源。

接收消息:

?

1

2

3

4

5

6

@Bean

public Consumer<String> pkslowSink() {

   return message -> {

     log.info( "Received message " + message);

   };

}

直接打印消息即可,项目中的逻辑可按具体业务实现。

配置属性

配置RabbitMQ:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: pkslow
    password: 123456

配置 Spring Cloud Stream 的相关项:

spring:
  cloud:
    stream:
      function:
        definition: pkslowSource;pkslowSink
      bindings:
        pkslowSource-out-0:
         destination: pkslow-topic
        pkslowSink-in-0:
          destination: pkslow-topic
      poller:
        fixed-delay: 500

spring.cloud.stream.function.definition 会定义处理方法,如本文的收发消息的方法;

bindings 配置对应的 function ;destination指向MQ的主题;

这里配了一个 poller ,每隔500ms就会发送一次消息。

运行

先启动个RabbitMQ:

?

1

2

3

4

5

docker run \

-e RABBITMQ_DEFAULT_USER=pkslow \

-e RABBITMQ_DEFAULT_PASS=123456 \

-p 5672:5672 -p 15672:15672 \

rabbitmq:3.8-management

运行程序后,会自己创建主题、发送信息、接收信息:

运行日志如下:

可以看到每一次发/收大概是间隔了500ms,当然不可能是精确的500ms。

以上就是Springcloud整合stream,rabbitmq实现消息驱动功能的详细内容,更多关于Springcloud stream rabbitmq消息驱动的资料请关注其它相关文章!

原文链接:https://blog.csdn.net/weixin_39220472/article/details/122555475

查看更多关于Springcloud整合stream,rabbitmq实现消息驱动功能的详细内容...

  阅读:15次