好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

分布式事务入门例子(Spring+JTA+Atomikos+Hibernate+JMS)

分布式事务入门例子(Spring+JTA+Atomikos+Hibernate+JMS)

分布式事务入门例子(Spring+JTA+Atomikos+Hibernate+JMS)

在 本系列 先前的文章中,我们主要讲解了JDBC对本地事务的处理,本篇文章将讲到一个分布式事务的例子。

请通过以下方式下载github源代码:

git clone  https: //  github.com/davenkin/jta-atomikos-hibernate-activemq.git  

本地事务和分布式事务的区别在于:本地事务只用于处理单一数据源事务(比如单个数据库),分布式事务可以处理多种异构的数据源,比如某个业务操作中同时包含了JDBC和JMS或者某个操作需要访问多个不同的数据库。

Java通过JTA完成分布式事务,JTA本身只是一种规范,不同的应用服务器都包含有自己的实现(比如JbossJTA),同时还存在独立于应用服务器的单独JTA实现,比如本篇中要讲到的Atomikos。对于JTA的原理,这里不细讲,读者可以通过 这篇文章 了解相关知识。

在本篇文章中,我们将实现以下一个应用场景:你在网上购物,下了订单之后,订单数据将保存在系统的数据库中,同时为了安排物流,订单信息将以消息(Message)的方式发送到物流部门以便送货。

以上操作同时设计到数据库操作和JMS消息发送,为了使整个操作成为一个原子操作,我们只能选择分布式事务。我们首先设计一个service层,定义OrderService接口:

 package   davenkin;

  public   interface   OrderService {

      public   void   makeOrder(Order order);
} 

为了简单起见,我们设计一个非常简单的领域对象Order:

@XmlRootElement(name = "Order" )
@XmlAccessorType(XmlAccessType.FIELD)
  public   class   Order {

    @XmlElement(name  = "Id",required =  true  )
      private   long   id;

    @XmlElement(name  = "ItemName",required =  true  )
      private   String itemName;

    @XmlElement(name  = "Price",required =  true  )
      private   double   price;

    @XmlElement(name  = "BuyerName",required =  true  )
      private   String buyerName;

    @XmlElement(name  = "MailAddress",required =  true  )
      private   String mailAddress;

      public   Order() {
    } 

为了采用JAXB对Order对象进行Marshal和Unmarshal,我们在Order类中加入了JAXB相关的Annotation。 我们将使用Hibernate来完成数据持久化,然后使用Spring提供的JmsTemplate将Order转成xml后以TextMessage的形式发送到物流部门的ORDER.QUEUE中。

(一)准备数据库

为了方便,我们将采用Spring提供的embedded数据库,默认情况下Spring采用HSQL作为后台数据库,虽然在本例中我们将采用HSQL的非XA的DataSource,但是通过Atomikos包装之后依然可以参与分布式事务。

SQL脚本包含在createDB.sql文件中:

 CREATE   TABLE   USER_ORDER(
ID   INT   NOT   NULL  ,
ITEM_NAME   VARCHAR  ( 100 )  NOT   NULL   UNIQUE  ,
PRICE   DOUBLE   NOT   NULL  ,
BUYER_NAME   CHAR  ( 32 )  NOT   NULL  ,
MAIL_ADDRESS   VARCHAR ( 500 )  NOT   NULL  ,
  PRIMARY   KEY  (ID)
); 

在Spring中配置DataSource如下:

     <  jdbc:embedded-database   id  ="dataSource"  > 
         <  jdbc:script   location  ="classpath:createDB.sql"  /> 
     </  jdbc:embedded-database  > 

(二)启动ActiveMQ

我们将采用embedded的ActiveMQ,在测试之前启动ActiveMQ提供的BrokerService,在测试执行完之后关闭BrokerService。

   @BeforeClass
      public   static   void  startEmbeddedActiveMq()  throws   Exception {
        broker  =  new   BrokerService();
        broker.addConnector( "tcp://localhost:61616" );
        broker.start();
    }

    @AfterClass
      public   static   void  stopEmbeddedActiveMq()  throws   Exception {
        broker.stop();
    } 

(三)实现OrderService

创建一个DefaultOrderService,该类实现了OrderService接口,并维护一个JmsTemplate和一个Hibernate的SessionFactory实例变量,分别用于Message的发送和数据库处理。

 package   davenkin;

  import   org.hibernate.SessionFactory;
  import   org.hibernate.classic.Session;
  import   org.springframework.beans.factory.annotation.Required;
  import   org.springframework.jms.core.JmsTemplate;
  import   org.springframework.transaction.annotation.Transactional;

  public   class  DefaultOrderService   implements   OrderService{
      private   JmsTemplate jmsTemplate;
      private   SessionFactory sessionFactory;

    @Override
    @Transactional
      public   void   makeOrder(Order order) {
        Session session  =  sessionFactory.getCurrentSession();
        session.save(order);
        jmsTemplate.convertAndSend(order);

    }

    @Required
      public   void   setJmsTemplate(JmsTemplate jmsTemplate) {
          this .jmsTemplate =  jmsTemplate;
    }

    @Required
      public   void   setSessionFactory(SessionFactory sessionFactory) {
          this .sessionFactory =  sessionFactory;
    }
} 

(四)创建Order的Mapping配置文件

 <?  xml version="1.0"  ?> 
 <!  DOCTYPE hibernate-mapping PUBLIC "-//Hibernate/Hibernate Mapping DTD 3.0//EN"
        "http://hibernate.sourceforge.net/hibernate-mapping-3.0.dtd"  > 

 <  hibernate-mapping  > 
     <  class   name  ="davenkin.Order"   table  ="USER_ORDER"  > 
         <  id   name  ="id"   type  ="long"  > 
             <  column   name  ="ID"   /> 
             <  generator   class  ="increment"   /> 
         </  id  > 
         <  property   name  ="itemName"   type  ="string"  > 
             <  column   name  ="ITEM_NAME"   /> 
         </  property  > 
         <  property   name  ="price"   type  ="double"  > 
             <  column   name  ="PRICE"  /> 
         </  property  > 
         <  property   name  ="buyerName"   type  ="string"  > 
             <  column   name  ="BUYER_NAME"  /> 
         </  property  > 
         <  property   name  ="mailAddress"   type  ="string"  > 
             <  column   name  ="MAIL_ADDRESS"  /> 
         </  property  > 
     </  class  > 
 </  hibernate-mapping  > 

(五)配置Atomikos事务

在Spring的IoC容器中,我们需要配置由Atomikos提供的UserTransaction和TransactionManager,然后再配置Spring的JtaTransactionManager:

   <  bean   id  ="userTransactionService"   class  ="com.atomikos.icatch.config.UserTransactionServiceImp"   init-method  ="init"   destroy-method  ="shutdownForce"  > 
         <  constructor-arg  > 
             <  props  > 
                 <  prop   key  ="com.atomikos.icatch.service"  > com.atomikos.icatch.standalone.UserTransactionServiceFactory </  prop  > 
             </  props  > 
         </  constructor-arg  > 
     </  bean  > 

     <  bean   id  ="atomikosTransactionManager"   class  ="com.atomikos.icatch.jta.UserTransactionManager"   init-method  ="init"   destroy-method  ="close"   depends-on  ="userTransactionService"  > 
         <  property   name  ="forceShutdown"   value  ="false"   /> 
     </  bean  > 

     <  bean   id  ="atomikosUserTransaction"   class  ="com.atomikos.icatch.jta.UserTransactionImp"   depends-on  ="userTransactionService"  > 
         <  property   name  ="transactionTimeout"   value  ="300"   /> 
     </  bean  > 

     <  bean   id  ="jtaTransactionManager"   class  ="org.springframework.transaction.jta.JtaTransactionManager"   depends-on  ="userTransactionService"  > 
         <  property   name  ="transactionManager"   ref  ="atomikosTransactionManager"   /> 
         <  property   name  ="userTransaction"   ref  ="atomikosUserTransaction"   /> 
     </  bean  > 

     <  tx:annotation-driven   transaction-manager  ="jtaTransactionManager"   /> 

(六)配置JMS

对于JMS,为了能使ActiveMQ加入到分布式事务中,我们需要配置ActiveMQXAConnectionFactory,而不是ActiveMQConnectionFactory,然后再配置JmsTemplate,此外还需要配置MessageConvertor在Order对象和XML之间互转。

     <  bean   id  ="jmsXaConnectionFactory"   class  ="org.apache.activemq.ActiveMQXAConnectionFactory"  > 
         <  property   name  ="brokerURL"   value  ="tcp://localhost:61616"   /> 
     </  bean  > 

     <  bean   id  ="amqConnectionFactory"   class  ="com.atomikos.jms.AtomikosConnectionFactoryBean"   init-method  ="init"  > 
         <  property   name  ="uniqueResourceName"   value  ="XAactiveMQ"   /> 
         <  property   name  ="xaConnectionFactory"   ref  ="jmsXaConnectionFactory"   /> 
         <  property   name  ="poolSize"   value  ="5"  /> 
     </  bean  > 


     <  bean   id  ="jmsTemplate"   class  ="org.springframework.jms.core.JmsTemplate"  > 
         <  property   name  ="connectionFactory"   ref  ="amqConnectionFactory"  /> 
         <  property   name  ="receiveTimeout"   value  ="2000"   /> 
         <  property   name  ="defaultDestination"   ref  ="orderQueue"  /> 
         <  property   name  ="sessionTransacted"   value  ="true"   /> 
         <  property   name  ="messageConverter"   ref  ="oxmMessageConverter"  /> 
     </  bean  > 

     <  bean   id  ="orderQueue"   class  ="org.apache.activemq.command.ActiveMQQueue"  > 
         <  constructor-arg   value  ="ORDER.QUEUE"  /> 
     </  bean  > 


     <  bean   id  ="oxmMessageConverter"  
          class  ="org.springframework.jms.support.converter.MarshallingMessageConverter"  > 
         <  property   name  ="marshaller"   ref  ="marshaller"  /> 
         <  property   name  ="unmarshaller"   ref  ="marshaller"  /> 
     </  bean  > 

     <  oxm:jaxb2-marshaller   id  ="marshaller"  > 
         <  oxm:class-to-be-bound   name  ="davenkin.Order"  /> 
     </  oxm:jaxb2-marshaller  > 

(七)测试

在测试中,我们首先通过(二)中的方法启动ActiveMQ,再调用DefaultOrderService,最后对数据库和QUEUE进行验证:

    @Test
      public   void   makeOrder(){
        orderService.makeOrder(createOrder());
        JdbcTemplate jdbcTemplate  =  new   JdbcTemplate(dataSource);
        assertEquals( 1, jdbcTemplate.queryForInt("SELECT COUNT(*) FROM USER_ORDER" ));
        String dbItemName  = jdbcTemplate.queryForObject("SELECT ITEM_NAME FROM USER_ORDER", String. class  );
        String messageItemName  =  ((Order) jmsTemplate.receiveAndConvert()).getItemName();
        assertEquals(dbItemName, messageItemName);
    }

    @Test(expected  = IllegalArgumentException. class  )
      public   void   failToMakeOrder()
    {
        orderService.makeOrder(  null  );
        JdbcTemplate jdbcTemplate  =  new   JdbcTemplate(dataSource);
        assertEquals( 0, jdbcTemplate.queryForInt("SELECT COUNT(*) FROM USER_ORDER" ));
        assertNull(jmsTemplate.receiveAndConvert());
    } 

 

 

 

标签:  java ,  java ee ,  j2ee ,  事务 ,  数据库

作者: Leo_wl

    

出处: http://www.cnblogs.com/Leo_wl/

    

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

版权信息

查看更多关于分布式事务入门例子(Spring+JTA+Atomikos+Hibernate+JMS)的详细内容...

  阅读:38次