好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Spring Boot集成Java DSL的实现代码

spring integration java dsl已经融合到 spring integration core 5.0 ,这是一个聪明而明显的举动,因为:

基于java config启动新spring项目的每个人都使用它 si java dsl使您可以使用lambdas等新的强大java 8功能 您可以使用 基于 integrationflowbuilder 的 builder模式 构建流

让我们看看基于activemq jms的示例如何使用它。

maven依赖:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

<dependencies>

   <dependency>

     <groupid>org.springframework.boot</groupid>

     <artifactid>spring-boot-starter-activemq</artifactid>

   </dependency>

 

   <dependency>

     <groupid>org.springframework.integration</groupid>

     <artifactid>spring-integration-core</artifactid>

   </dependency>

 

   <dependency>

     <groupid>org.springframework.integration</groupid>

     <artifactid>spring-integration-jms</artifactid>

   </dependency>

 

   <dependency>

     <groupid>org.springframework.boot</groupid>

     <artifactid>spring-boot-starter-test</artifactid>

     <scope>test</scope>

   </dependency>

 

   <dependency>

     <groupid>org.apache.activemq</groupid>

     <artifactid>activemq-kahadb-store</artifactid>

   </dependency>

 

   <!-- https: //mvnrepository.com/artifact/org.springframework.integration/spring-integration-java-dsl -->

   <dependency>

     <groupid>org.springframework.integration</groupid>

     <artifactid>spring-integration-java-dsl</artifactid>

     <version> 1.2 . 3 .release</version>

   </dependency>

</dependencies>

示例1:jms入站网关

我们有以下 serviceactivator :

?

1

2

3

4

5

6

7

@service

public class activemqendpoint {

   @serviceactivator (inputchannel = "inboundchannel" )

   public void processmessage( final string inboundpayload) {

     system.out.println( "inbound message: " +inboundpayload);

   }

}

如果您想使用si java dsl 将inboundpayload从jms队列发送到 gateway 风格的激活器,那么请使用dsl jms 工厂:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

@bean

public dynamicdestinationresolver dynamicdestinationresolver() {

   return new dynamicdestinationresolver();

}

 

@bean

public activemqconnectionfactory connectionfactory() {

   return new activemqconnectionfactory();

}

 

@bean

public defaultmessagelistenercontainer listenercontainer() {

   final defaultmessagelistenercontainer defaultmessagelistenercontainer = new defaultmessagelistenercontainer();

   defaultmessagelistenercontainer.setdestinationresolver(dynamicdestinationresolver());

   defaultmessagelistenercontainer.setconnectionfactory(connectionfactory());

   defaultmessagelistenercontainer.setdestinationname( "jms.activemq.test" );

   return defaultmessagelistenercontainer;

}

 

@bean

public messagechannel inboundchannel() {

   return messagechannels.direct( "inboundchannel" ).get();

}

 

@bean

public jmsinboundgateway dataendpoint() {

   return jms.inboundgateway(listenercontainer())

       .requestchannel(inboundchannel()).get();

}

通过dataendpoint bean 返回 jmsinboundgatewayspec ,您还可以向si通道或jms目标发送回复。查看文档。

示例2:jms消息驱动的通道适配器

如果您正在寻找替换消息驱动通道适配器的xml jms配置,那么 jmsmessagedrivenchanneladapter 是一种适合您的方式:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

@bean

public dynamicdestinationresolver dynamicdestinationresolver() {

   return new dynamicdestinationresolver();

}

 

@bean

public activemqconnectionfactory connectionfactory() {

   return new activemqconnectionfactory();

}

 

@bean

public defaultmessagelistenercontainer listenercontainer() {

   final defaultmessagelistenercontainer defaultmessagelistenercontainer = new defaultmessagelistenercontainer();

   defaultmessagelistenercontainer.setdestinationresolver(dynamicdestinationresolver());

   defaultmessagelistenercontainer.setconnectionfactory(connectionfactory());

   defaultmessagelistenercontainer.setdestinationname( "jms.activemq.test" );

   return defaultmessagelistenercontainer;

}

 

@bean

public messagechannel inboundchannel() {

   return messagechannels.direct( "inboundchannel" ).get();

}

 

@bean

public jmsmessagedrivenchanneladapter dataendpoint() {

   final channelpublishingjmsmessagelistener channelpublishingjmsmessagelistener =

       new channelpublishingjmsmessagelistener();

   channelpublishingjmsmessagelistener.setexpectreply( false );

   final jmsmessagedrivenchanneladapter messagedrivenchanneladapter = new

       jmsmessagedrivenchanneladapter(listenercontainer(), channelpublishingjmsmessagelistener

       );

 

   messagedrivenchanneladapter.setoutputchannel(inboundchannel());

   return messagedrivenchanneladapter;

}

与前面的示例一样,入站有效负载如样本1中一样发送给激活器。

示例3:使用jaxb的jms消息驱动的通道适配器

在典型的场景中,您希望通过jms接受xml作为文本消息,将其转换为jaxb存根并在服务激活器中处理它。我将向您展示如何使用si java dsl执行此操作,但首先让我们为xml处理添加两个依赖项:

?

1

2

3

4

5

6

7

8

9

<dependency>

     <groupid>org.springframework.integration</groupid>

     <artifactid>spring-integration-xml</artifactid>

   </dependency>

 

   <dependency>

     <groupid>org.springframework</groupid>

     <artifactid>spring-oxm</artifactid>

   </dependency>

我们将通过jms接受shiporders ,所以首先xsd命名为shiporder.xsd:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

<?xml version= "1.0" encoding= "utf-8" ?>

<xs:schema xmlns:xs= "http://www.w3.org/2001/xmlschema" >

 

   <xs:element name= "shiporder" >

     <xs:complextype>

       <xs:sequence>

         <xs:element name= "orderperson" type= "xs:string" />

         <xs:element name= "shipto" >

           <xs:complextype>

             <xs:sequence>

               <xs:element name= "name" type= "xs:string" />

               <xs:element name= "address" type= "xs:string" />

               <xs:element name= "city" type= "xs:string" />

               <xs:element name= "country" type= "xs:string" />

             </xs:sequence>

           </xs:complextype>

         </xs:element>

         <xs:element name= "item" maxoccurs= "unbounded" >

           <xs:complextype>

             <xs:sequence>

               <xs:element name= "title" type= "xs:string" />

               <xs:element name= "note" type= "xs:string" minoccurs= "0" />

               <xs:element name= "quantity" type= "xs:positiveinteger" />

               <xs:element name= "price" type= "xs:decimal" />

             </xs:sequence>

           </xs:complextype>

         </xs:element>

       </xs:sequence>

       <xs:attribute name= "orderid" type= "xs:string" use= "required" />

     </xs:complextype>

   </xs:element>

 

</xs:schema>

新增jaxb maven plugin 生成jaxb存根:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

<plugin>

      <groupid>org.codehaus.mojo</groupid>

      <artifactid>jaxb2-maven-plugin</artifactid>

      <version> 2.3 . 1 </version>

      <executions>

        <execution>

          <id>xjc-schema1</id>

          <goals>

            <goal>xjc</goal>

          </goals>

          <configuration>

            <!-- use all xsds under the west directory for sources here. -->

            <sources>

              <source>src/main/resources/xsds/shiporder.xsd</source>

            </sources>

 

            <!-- package name of the generated sources. -->

            <packagename>com.example.stubs</packagename>

            <outputdirectory>src/main/java</outputdirectory>

            <clearoutputdir> false </clearoutputdir>

          </configuration>

        </execution>

      </executions>

    </plugin>

我们已经准备好了存根类和一切,现在使用jaxb magic的java dsl jms消息驱动适配器:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

/**

  * sample 3: jms message driven adapter with jaxb

  */

@bean

public jmsmessagedrivenchanneladapter dataendpoint() {

   final channelpublishingjmsmessagelistener channelpublishingjmsmessagelistener =

       new channelpublishingjmsmessagelistener();

   channelpublishingjmsmessagelistener.setexpectreply( false );

   channelpublishingjmsmessagelistener.setmessageconverter( new marshallingmessageconverter(shipordersmarshaller()));

   final jmsmessagedrivenchanneladapter messagedrivenchanneladapter = new

       jmsmessagedrivenchanneladapter(listenercontainer(), channelpublishingjmsmessagelistener

   );

 

   messagedrivenchanneladapter.setoutputchannel(inboundchannel());

   return messagedrivenchanneladapter;

}

 

@bean

public jaxb2marshaller shipordersmarshaller() {

   jaxb2marshaller marshaller = new jaxb2marshaller();

   marshaller.setcontextpath( "com.example.stubs" );

   return marshaller;

}

xml配置在java中使用它可以为您提供如此强大的功能和灵活性。要完成此示例,inboundchannel的服务激活器将如下所示:

?

1

2

3

4

5

6

7

8

9

/**

  * sample 3

  * @param shiporder

  */

@serviceactivator (inputchannel = "inboundchannel" )

public void processmessage( final shiporder shiporder) {

   system.out.println(shiporder.getorderid());

   system.out.println(shiporder.getorderperson());

}

要测试流,您可以使用以下xml通过jconsole发送到jms队列:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

<?xml version= "1.0" encoding= "utf-8" ?>   

  <shiporder orderid= "889923"

    xmlns:xsi= "http://www.w3.org/2001/xmlschema-instance"

    xsi:nonamespaceschemalocation= "shiporder.xsd" >

   <orderperson>john smith</orderperson>

    <shipto>

     <name>ola nordmann</name>

     <address>langgt 23 </address>

     <city> 4000 stavanger</city>

     <country>norway</country>

    </shipto>

    <item>

     <title>empire burlesque</title>

     <note>special edition</note>

     <quantity> 1 </quantity>

     <price> 10.90 </price>

     </item>

    <item>

     <title>hide your heart</title>

     <quantity> 1 </quantity>

     <price> 9.90 </price>

    </item>

  </shiporder>

示例4:具有jaxb和有效负载根路由的jms消息驱动的通道适配器

另一种典型情况是接受xml作为jms文本消息,将其转换为jaxb存根并根据有效负载根类型将有效负载路由到某个服务激活器。当然si java dsl支持所有类型的路由,我将向您展示如何根据有效载荷类型进行路由。

首先,将以下xsd添加到shiporder.xsd所在的文件夹中,并将其命名为purchaseorder.xsd:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

<xsd:schema xmlns:xsd= "http://www.w3.org/2001/xmlschema"

       xmlns:tns= "http://tempuri.org/purchaseorderschema.xsd"

       targetnamespace= "http://tempuri.org/purchaseorderschema.xsd"

       elementformdefault= "qualified" >

   <xsd:element name= "purchaseorder" >

     <xsd:complextype>

       <xsd:sequence>

         <xsd:element name= "shipto" type= "tns:usaddress" maxoccurs= "2" />

         <xsd:element name= "billto" type= "tns:usaddress" />

       </xsd:sequence>

       <xsd:attribute name= "orderdate" type= "xsd:date" />

     </xsd:complextype>

   </xsd:element>

 

   <xsd:complextype name= "usaddress" >

     <xsd:sequence>

       <xsd:element name= "name"   type= "xsd:string" />

       <xsd:element name= "street" type= "xsd:string" />

       <xsd:element name= "city"   type= "xsd:string" />

       <xsd:element name= "state" type= "xsd:string" />

       <xsd:element name= "zip"   type= "xsd:integer" />

     </xsd:sequence>

     <xsd:attribute name= "country" type= "xsd:nmtoken" fixed= "us" />

   </xsd:complextype>

</xsd:schema>

然后添加到jaxb maven插件配置:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

<plugin>

      <groupid>org.codehaus.mojo</groupid>

      <artifactid>jaxb2-maven-plugin</artifactid>

      <version> 2.3 . 1 </version>

      <executions>

        <execution>

          <id>xjc-schema1</id>

          <goals>

            <goal>xjc</goal>

          </goals>

          <configuration>

            <!-- use all xsds under the west directory for sources here. -->

            <sources>

              <source>src/main/resources/xsds/shiporder.xsd</source>

              <source>src/main/resources/xsds/purchaseorder.xsd</source>

            </sources>

 

            <!-- package name of the generated sources. -->

            <packagename>com.example.stubs</packagename>

            <outputdirectory>src/main/java</outputdirectory>

            <clearoutputdir> false </clearoutputdir>

          </configuration>

        </execution>

      </executions>

    </plugin>

运行mvn clean install以生成新xsd的jaxb存根。现在承诺有效负载根映射:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

@bean

public jaxb2marshaller ordersmarshaller() {

   jaxb2marshaller marshaller = new jaxb2marshaller();

   marshaller.setcontextpath( "com.example.stubs" );

   return marshaller;

}

 

/**

  * sample 4: jms message driven adapter with jaxb and payload routing.

  * @return

  */

@bean

public jmsmessagedrivenchanneladapter dataendpoint() {

   final channelpublishingjmsmessagelistener channelpublishingjmsmessagelistener =

       new channelpublishingjmsmessagelistener();

   channelpublishingjmsmessagelistener.setmessageconverter( new marshallingmessageconverter(ordersmarshaller()));

   final jmsmessagedrivenchanneladapter messagedrivenchanneladapter = new

       jmsmessagedrivenchanneladapter(listenercontainer(), channelpublishingjmsmessagelistener

   );

 

   messagedrivenchanneladapter.setoutputchannel(inboundchannel());

   return messagedrivenchanneladapter;

}

 

@bean

public integrationflow payloadrootmapping() {

   return integrationflows.from(inboundchannel()).<object, class <?>>route(object::getclass, m->m

       .subflowmapping(shiporder. class , sf->sf.handle((messagehandler) message -> {

         final shiporder shiporder = (shiporder) message.getpayload();

         system.out.println(shiporder.getorderperson());

         system.out.println(shiporder.getorderid());

       }))

       .subflowmapping(purchaseorder. class , sf->sf.handle((messagehandler) message -> {

         final purchaseorder purchaseordertype = (purchaseorder) message.getpayload();

         system.out.println(purchaseordertype.getbillto().getname());

       }))

   ).get();

}

注意payloadrootmapping bean,让我们解释一下重要的部分:

<object, class<?>> route - 表示来自inboundchannel的输入将是object,并且将根据class <?>执行路由 subflowmapping(shiporder.class.. - shipoders的处理。 subflowmapping(purchaseorder.class ... - 处理purchaseorders。

要测试shiporder有效负载,请使用示例3中的xml,以测试purchaseorder有效负载,使用以下xml:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

<?xml version= "1.0" encoding= "utf-8" ?>

<purchaseorder orderdate= "1900-01-01" xmlns= "http://tempuri.org/purchaseorderschema.xsd" >

  <shipto country= "us" >

   <name>name1</name>

   <street>street1</street>

   <city>city1</city>

   <state>state1</state>

   <zip> 1 </zip>

  </shipto>

  <shipto country= "us" >

   <name>name2</name>

   <street>street2</street>

   <city>city2</city>

   <state>state2</state>

   <zip>- 79228162514264337593543950335 </zip>

  </shipto>

  <billto country= "us" >

   <name>name1</name>

   <street>street1</street>

   <city>city1</city>

   <state>state1</state>

   <zip> 1 </zip>

  </billto>

</purchaseorder>

应根据subflow 子流map路由两个有效载荷。

示例5:integrationflowadapter

除了企业集成模式的其他实现(check them out)),我需要提到integrationflowadapter。通过扩展此类并实现buildflow方法,如:

?

1

2

3

4

5

6

7

8

9

10

11

12

[url=https: //bitbucket.org/component/]@component[/url]

public class myflowadapter extends integrationflowadapter {

 

@autowired

  private connectionfactory rabbitconnectionfactory;

 

  @override

  protected integrationflowdefinition<?> buildflow() {

    return from(amqp.inboundadapter( this .rabbitconnectionfactory, "myqueue" ))

         .<string, string>transform(string::tolowercase)

         .channel(c -> c.queue( "myflowadapteroutput" ));

  }

你可以将bean的重复声明包装成一个组件并给它们所需的流量。然后可以配置这样的组件并将其作为一个类实例提供给调用代码!

因此,让我们举例说明这个repo中的示例3更短一些,并为所有jmsendpoints定义基类,并在其中定义重复bean:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

public class jmsendpoint extends integrationflowadapter {

 

   private string queuename;

 

   private string channelname;

 

   private string contextpath;

 

   /**

    * @param queuename

    * @param channelname

    * @param contextpath

    */

   public jmsendpoint(string queuename, string channelname, string contextpath) {

     this .queuename = queuename;

     this .channelname = channelname;

     this .contextpath = contextpath;

   }

 

   @override

   protected integrationflowdefinition<?> buildflow() {

     return from(jms.messagedrivenchanneladapter(listenercontainer())

       .jmsmessageconverter( new marshallingmessageconverter(shipordersmarshaller()))

     ).channel(channelname);

   }

 

   @bean

   public jaxb2marshaller shipordersmarshaller() {

     jaxb2marshaller marshaller = new jaxb2marshaller();

     marshaller.setcontextpath(contextpath);

     return marshaller;

   }

 

   @bean

   public dynamicdestinationresolver dynamicdestinationresolver() {

     return new dynamicdestinationresolver();

   }

 

   @bean

   public activemqconnectionfactory connectionfactory() {

     return new activemqconnectionfactory();

   }

 

   @bean

   public defaultmessagelistenercontainer listenercontainer() {

     final defaultmessagelistenercontainer defaultmessagelistenercontainer = new defaultmessagelistenercontainer();

     defaultmessagelistenercontainer.setdestinationresolver(dynamicdestinationresolver());

     defaultmessagelistenercontainer.setconnectionfactory(connectionfactory());

     defaultmessagelistenercontainer.setdestinationname(queuename);

     return defaultmessagelistenercontainer;

   }

 

   @bean

   public messagechannel inboundchannel() {

     return messagechannels.direct(channelname).get();

   }

}

现在声明特定队列的jms端点很容易:

?

1

2

3

4

@bean

public jmsendpoint jmsendpoint() {

   return new jmsendpoint( "jms.activemq.test" , "inboundchannel" , "com.example.stubs" );

}

inboundchannel的服务激活器:

?

1

2

3

4

5

6

7

8

9

/**

  * sample 3, 5

  * @param shiporder

  */

@serviceactivator (inputchannel = "inboundchannel" )

public void processmessage( final shiporder shiporder) {

   system.out.println(shiporder.getorderid());

   system.out.println(shiporder.getorderperson());

}

您不应该错过在项目中使用integrationflowadapter。我喜欢它的概念。

我最近在 embedit 的新的基于spring boot的项目中开始使用spring integration java dsl 。即使有一些配置,我发现它非常有用。

它很容易调试。不添加像wiretap这样的配置。 阅读起来要容易得多。是的,即使是lambdas! 它很强大。在java配置中,您现在有很多选择。

源码地址:  https://bitbucket.org/tomask79/spring-integration-java-dsl

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

原文链接:https://www.jdon.com/51378

查看更多关于Spring Boot集成Java DSL的实现代码的详细内容...

  阅读:15次