分布式事务入门例子(Spring+JTA+Atomikos+Hibernate+JMS)
分布式事务入门例子(Spring+JTA+Atomikos+Hibernate+JMS)
在 本系列 先前的文章中,我们主要讲解了JDBC对本地事务的处理,本篇文章将讲到一个分布式事务的例子。
请通过以下方式下载github源代码:
git clone https: // github测试数据/davenkin/jta-atomikos-hibernate-activemq.git
本地事务和分布式事务的区别在于:本地事务只用于处理单一数据源事务(比如单个数据库),分布式事务可以处理多种异构的数据源,比如某个业务操作中同时包含了JDBC和JMS或者某个操作需要访问多个不同的数据库。
Java通过JTA完成分布式事务,JTA本身只是一种规范,不同的应用服务器都包含有自己的实现(比如JbossJTA),同时还存在独立于应用服务器的单独JTA实现,比如本篇中要讲到的Atomikos。对于JTA的原理,这里不细讲,读者可以通过 这篇文章 了解相关知识。
在本篇文章中,我们将实现以下一个应用场景:你在网上购物,下了订单之后,订单数据将保存在系统的数据库中,同时为了安排物流,订单信息将以消息(Message)的方式发送到物流部门以便送货。
以上操作同时设计到数据库操作和JMS消息发送,为了使整个操作成为一个原子操作,我们只能选择分布式事务。我们首先设计一个service层,定义OrderService接口:
package davenkin;
public interface OrderService {
public void makeOrder(Order order);
}
为了简单起见,我们设计一个非常简单的领域对象Order:
@XmlRootElement(name = "Order" )
@XmlAccessorType(XmlAccessType.FIELD)
public class Order {
@XmlElement(name = "Id",required = true )
private long id;
@XmlElement(name = "ItemName",required = true )
private String itemName;
@XmlElement(name = "Price",required = true )
private double price;
@XmlElement(name = "BuyerName",required = true )
private String buyerName;
@XmlElement(name = "MailAddress",required = true )
private String mailAddress;
public Order() {
}
为了采用JAXB对Order对象进行Marshal和Unmarshal,我们在Order类中加入了JAXB相关的Annotation。 我们将使用Hibernate来完成数据持久化,然后使用Spring提供的JmsTemplate将Order转成xml后以TextMessage的形式发送到物流部门的ORDER.QUEUE中。
(一)准备数据库
为了方便,我们将采用Spring提供的embedded数据库,默认情况下Spring采用HSQL作为后台数据库,虽然在本例中我们将采用HSQL的非XA的DataSource,但是通过Atomikos包装之后依然可以参与分布式事务。
SQL脚本包含在createDB.sql文件中:
CREATE TABLE USER_ORDER( ID INT NOT NULL , ITEM_NAME VARCHAR ( 100 ) NOT NULL UNIQUE , PRICE DOUBLE NOT NULL , BUYER_NAME CHAR ( 32 ) NOT NULL , MAIL_ADDRESS VARCHAR ( 500 ) NOT NULL , PRIMARY KEY (ID) );
在Spring中配置DataSource如下:
< jdbc:embedded-database id ="dataSource" >
< jdbc:script location ="classpath:createDB.sql" />
</ jdbc:embedded-database >
(二)启动ActiveMQ
我们将采用embedded的ActiveMQ,在测试之前启动ActiveMQ提供的BrokerService,在测试执行完之后关闭BrokerService。
@BeforeClass
public static void startEmbeddedActiveMq() throws Exception {
broker = new BrokerService();
broker.addConnector( "tcp://localhost:61616" );
broker.start();
}
@AfterClass
public static void stopEmbeddedActiveMq() throws Exception {
broker.stop();
}
(三)实现OrderService
创建一个DefaultOrderService,该类实现了OrderService接口,并维护一个JmsTemplate和一个Hibernate的SessionFactory实例变量,分别用于Message的发送和数据库处理。
package davenkin;
import org.hibernate.SessionFactory;
import org.hibernate.classic.Session;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.transaction.annotation.Transactional;
public class DefaultOrderService implements OrderService{
private JmsTemplate jmsTemplate;
private SessionFactory sessionFactory;
@Override
@Transactional
public void makeOrder(Order order) {
Session session = sessionFactory.getCurrentSession();
session.save(order);
jmsTemplate.convertAndSend(order);
}
@Required
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this .jmsTemplate = jmsTemplate;
}
@Required
public void setSessionFactory(SessionFactory sessionFactory) {
this .sessionFactory = sessionFactory;
}
}
(四)创建Order的Mapping配置文件
<? xml version="1.0" ?>
<! DOCTYPE hibernate-mapping PUBLIC "-//Hibernate/Hibernate Mapping DTD 3.0//EN"
"http://hibernate.sourceforge.net/hibernate-mapping-3.0.dtd" >
< hibernate-mapping >
< class name ="davenkin.Order" table ="USER_ORDER" >
< id name ="id" type ="long" >
< column name ="ID" />
< generator class ="increment" />
</ id >
< property name ="itemName" type ="string" >
< column name ="ITEM_NAME" />
</ property >
< property name ="price" type ="double" >
< column name ="PRICE" />
</ property >
< property name ="buyerName" type ="string" >
< column name ="BUYER_NAME" />
</ property >
< property name ="mailAddress" type ="string" >
< column name ="MAIL_ADDRESS" />
</ property >
</ class >
</ hibernate-mapping >
(五)配置Atomikos事务
在Spring的IoC容器中,我们需要配置由Atomikos提供的UserTransaction和TransactionManager,然后再配置Spring的JtaTransactionManager:
< bean id ="userTransactionService" class ="com.atomikos.icatch.config.UserTransactionServiceImp" init-method ="init" destroy-method ="shutdownForce" >
< constructor-arg >
< props >
< prop key ="com.atomikos.icatch.service" > com.atomikos.icatch.standalone.UserTransactionServiceFactory </ prop >
</ props >
</ constructor-arg >
</ bean >
< bean id ="atomikosTransactionManager" class ="com.atomikos.icatch.jta.UserTransactionManager" init-method ="init" destroy-method ="close" depends-on ="userTransactionService" >
< property name ="forceShutdown" value ="false" />
</ bean >
< bean id ="atomikosUserTransaction" class ="com.atomikos.icatch.jta.UserTransactionImp" depends-on ="userTransactionService" >
< property name ="transactionTimeout" value ="300" />
</ bean >
< bean id ="jtaTransactionManager" class ="org.springframework.transaction.jta.JtaTransactionManager" depends-on ="userTransactionService" >
< property name ="transactionManager" ref ="atomikosTransactionManager" />
< property name ="userTransaction" ref ="atomikosUserTransaction" />
</ bean >
< tx:annotation-driven transaction-manager ="jtaTransactionManager" />
(六)配置JMS
对于JMS,为了能使ActiveMQ加入到分布式事务中,我们需要配置ActiveMQXAConnectionFactory,而不是ActiveMQConnectionFactory,然后再配置JmsTemplate,此外还需要配置MessageConvertor在Order对象和XML之间互转。
< bean id ="jmsXaConnectionFactory" class ="org.apache.activemq.ActiveMQXAConnectionFactory" >
< property name ="brokerURL" value ="tcp://localhost:61616" />
</ bean >
< bean id ="amqConnectionFactory" class ="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method ="init" >
< property name ="uniqueResourceName" value ="XAactiveMQ" />
< property name ="xaConnectionFactory" ref ="jmsXaConnectionFactory" />
< property name ="poolSize" value ="5" />
</ bean >
< bean id ="jmsTemplate" class ="org.springframework.jms.core.JmsTemplate" >
< property name ="connectionFactory" ref ="amqConnectionFactory" />
< property name ="receiveTimeout" value ="2000" />
< property name ="defaultDestination" ref ="orderQueue" />
< property name ="sessionTransacted" value ="true" />
< property name ="messageConverter" ref ="oxmMessageConverter" />
</ bean >
< bean id ="orderQueue" class ="org.apache.activemq测试数据mand.ActiveMQQueue" >
< constructor-arg value ="ORDER.QUEUE" />
</ bean >
< bean id ="oxmMessageConverter"
class ="org.springframework.jms.support.converter.MarshallingMessageConverter" >
< property name ="marshaller" ref ="marshaller" />
< property name ="unmarshaller" ref ="marshaller" />
</ bean >
< oxm:jaxb2-marshaller id ="marshaller" >
< oxm:class-to-be-bound name ="davenkin.Order" />
</ oxm:jaxb2-marshaller >
(七)测试
在测试中,我们首先通过(二)中的方法启动ActiveMQ,再调用DefaultOrderService,最后对数据库和QUEUE进行验证:
@Test
public void makeOrder(){
orderService.makeOrder(createOrder());
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
assertEquals( 1, jdbcTemplate.queryForInt("SELECT COUNT(*) FROM USER_ORDER" ));
String dbItemName = jdbcTemplate.queryForObject("SELECT ITEM_NAME FROM USER_ORDER", String. class );
String messageItemName = ((Order) jmsTemplate.receiveAndConvert()).getItemName();
assertEquals(dbItemName, messageItemName);
}
@Test(expected = IllegalArgumentException. class )
public void failToMakeOrder()
{
orderService.makeOrder( null );
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
assertEquals( 0, jdbcTemplate.queryForInt("SELECT COUNT(*) FROM USER_ORDER" ));
assertNull(jmsTemplate.receiveAndConvert());
}
标签: java , java ee , j2ee , 事务 , 数据库
作者: Leo_wl
出处: http://HdhCmsTestcnblogs测试数据/Leo_wl/
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
版权信息查看更多关于分布式事务入门例子(Spring+JTA+Atomikos+Hibernate+JMS)的详细内容...