spring多数据源一致性事务方法

在一个业务中,在一个事务中处理时候将切换多个数据源,需要保证同一事务多个数据源数据的一致性。下面是spring中的两种配置方法。
本文内容从网络收集整理而成
 spring 多数据源配置

spring 多数据源配置一般有两种方案:

1、在spring项目启动的时候直接配置两个不同的数据源,不同的sessionFactory。在dao 层根据不同业务自行选择使用哪个数据源的session来操作。

2、配置多个不同的数据源,使用一个sessionFactory,在业务逻辑使用的时候自动切换到不同的数据源,有一个种是在拦截器里面根据不同的业务现切换到不同的datasource;有的会在业务层根据业务来自动切换。但这种方案在多线程并发的时候会出现一些问题,需要使用threadlocal等技术来实现多线程竞争切换数据源的问题。

 

【本文暂时只讨论第一种方案】

spring多事务配置主要体现在db配置这块,配置不同的数据源和不同的session

1、一下贴出 spring-db.xml配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:aop="http://www.springframework.org/schema/aop" 
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="

http://www.springframework.org/schema/beans


http://www.springframework.org/schema/beans/spring-beans-3.0.xsd


http://www.springframework.org/schema/context


http://www.springframework.org/schema/context/spring-context-3.0.xsd

    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd 
    http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd">

    <bean id="test1DataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close" >
        <property name="driverClassName" value="${database.test1.driverClassName}" />
        <property name="url" value="${database.test1.url}" />
        <property name="username" value="${database.test1.username}" />
        <property name="password" value="${database.test1.password}" />
    </bean>
    
    <bean id="test2DataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close" >
        <property name="driverClassName" value="${database.test2.driverClassName}" />
        <property name="url" value="${database.test2.url}" />
        <property name="username" value="${database.test2.username}" />
        <property name="password" value="${database.test2.password}" />
    </bean>
    
    <bean id="test1SqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">  
        <property name="dataSource" ref="test1DataSource" />
        <property name="configLocation" value="classpath:mybatis/mybatis-config.xml"></property>
        <property name="mapperLocations" value="classpath*:mybatis/mapper/*.xml" />
    </bean>  
    
    <bean id="test2SqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">  
        <property name="dataSource" ref="test2DataSource" />
        <property name="configLocation" value="classpath:mybatis/mybatis-config.xml"></property>
        <property name="mapperLocations" value="classpath*:mybatis/mapper/*.xml" />
    </bean>  
    
    <bean id="test1TxManager"   class="org.springframework.jdbc.datasource.DataSourceTransactionManager">  
        <property name="dataSource" ref="test1DataSource"></property>  
    </bean>  
    <bean id="test2TxManager"   class="org.springframework.jdbc.datasource.DataSourceTransactionManager">  
        <property name="dataSource" ref="test2DataSource"></property>  
    </bean>  
  
    <tx:annotation-driven transaction-manager="test2TxManager" />
    <tx:annotation-driven transaction-manager="test1TxManager" />
    
</beans>

2、dao层做了一个小的封装,将不同的SqlSessionFactory 注入到 SessionFactory,通过BaseDao来做简单的封装,封装不同库的基本增删改。dao实现层都集成于Basedao 这样的话,实现可以根据自己需要来选择不同的库来操作不同的内容。

session工厂

package com.neo.dao;

import com.neo.entity.Entity;

public class BaseDao extends SessionFactory{
    
    public void test1Update(Entity entity) {
    this.getTest1Session().update(entity.getClass().getSimpleName()+".update", entity);
    }
    
    public void test2Update(Entity entity) {
       this.getTest2Session().update(entity.getClass().getSimpleName()+".update", entity);
       }
}

BaseDao

package com.neo.dao;

import com.neo.entity.Entity;

public class BaseDao extends SessionFactory{
    
    public void test1Update(Entity entity) {
    this.getTest1Session().update(entity.getClass().getSimpleName()+".update", entity);
    }
    
    public void test2Update(Entity entity) {
       this.getTest2Session().update(entity.getClass().getSimpleName()+".update", entity);
       }
}

 

代码地址:https://github.com/ityouknow/spring-mybatis-mulidatasource/tree/master/spring-mybatis

 

以上的配置在多数据源连接,正常的增删改都是没有问题的,但是遇到分布式的事务是就出问题:

测试代码:

package com.neo.service.impl;

import javax.annotation.Resource;

import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import com.neo.dao.UserDao;
import com.neo.dao.UserInformationsDao;
import com.neo.entity.UserEntity;
import com.neo.entity.UserInformationsEntity;
import com.neo.service.UserService;

@Service
public class UserServiceImpl implements UserService {
    
    @Resource UserDao userDao;
    @Resource UserInformationsDao userInformationsDao;

    @Override
    @Transactional
    public void updateUserinfo() {
    
    UserEntity user=new UserEntity();
    user.setId(1);
    user.setUserName("李四4");
    
    UserInformationsEntity userInfo=new UserInformationsEntity();
    userInfo.setUserId(1);
    userInfo.setAddress("陕西4");
    
    userDao.updateUser(user);
    userInformationsDao.updateUserInformations(userInfo);
    
    if(true){
        throw new RuntimeException("test tx ");
    }
    }
}

在service添加事务后,更新完毕抛出异常,test2更新进行了回滚,test1 数据更新没有回滚。

解决方案添加分布式的事务,Atomikos和spring结合来处理。

Atomikos多数据源的配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:aop="http://www.springframework.org/schema/aop" 
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="

http://www.springframework.org/schema/beans


http://www.springframework.org/schema/beans/spring-beans-3.0.xsd


http://www.springframework.org/schema/context


http://www.springframework.org/schema/context/spring-context-3.0.xsd

    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd 
    http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd">

     <bean id="test1DataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close">
        <property name="uniqueResourceName" value="test1"/>
        <property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"/>
        <property name="xaProperties">
            <props>
                <prop key="url">${database.test1.url}</prop>
                <prop key="user">${database.test1.username}</prop>
                <prop key="password">${database.test1.password}</prop>
            </props>
        </property>
        <property name="minPoolSize" value="10" />
        <property name="maxPoolSize" value="100" />
        <property name="borrowConnectionTimeout" value="30" />
        <property name="testQuery" value="select 1" />
        <property name="maintenanceInterval" value="60" />
    </bean>
    
     <bean id="test2DataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close">
        <property name="uniqueResourceName" value="test2"/>
        <property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"/>
        <property name="xaProperties">
            <props>
                <prop key="url">${database.test2.url}</prop>
                <prop key="user">${database.test2.username}</prop>
                <prop key="password">${database.test2.password}</prop>
            </props>
        </property>
        <property name="minPoolSize" value="10" />
        <property name="maxPoolSize" value="100" />
        <property name="borrowConnectionTimeout" value="30" />
        <property name="testQuery" value="select 1" />
        <property name="maintenanceInterval" value="60" />
    </bean>
    
    <bean id="test1SqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">  
        <property name="dataSource" ref="test1DataSource" />
        <property name="configLocation" value="classpath:mybatis/mybatis-config.xml"></property>
        <property name="mapperLocations" value="classpath*:mybatis/mapper/*.xml" />
    </bean>  
    
    <bean id="test2SqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">  
        <property name="dataSource" ref="test2DataSource" />
        <property name="configLocation" value="classpath:mybatis/mybatis-config.xml"></property>
        <property name="mapperLocations" value="classpath*:mybatis/mapper/*.xml" />
    </bean>  
    
    <!-- 分布式事务 -->
    <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close">
        <property name="forceShutdown" value="true"/>
    </bean>
    
    <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
        <property name="transactionTimeout" value="300"/>
    </bean>
    
    <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="transactionManager" ref="atomikosTransactionManager"/>
        <property name="userTransaction" ref="atomikosUserTransaction"/>
    </bean>
    
    <tx:annotation-driven/>
    
</beans>

添加完分布式的项目地址:

https://github.com/ityouknow/spring-mybatis-mulidatasource/tree/master/spring-mybatis-atomikos

上述资料来源: http://www.cnblogs.com/ityouknow/p/4977136.html

 

其中, 文章中的: atomikos 是一个开源的java分布式xa事物项目, 百度百科介绍如下:

Atomikos 是一个为Java平台提供增值服务的并且开源类事务管理器。
Atomikos TransactionsEssentials 是一个为Java平台提供增值服务的并且开源类事务管理器,以下是包括在这个开源版本中的一些功能:
l 全面崩溃 / 重启恢复
l 兼容标准的SUN公司JTA API
l 嵌套事务
l 为XA和非XA提供内置的JDBC适配器
注释:XA:XA协议由Tuxedo首先提出的,并交给X/Open组织,作为资源管理器(数据库)与事务管理器的接口标准。目前,Oracle、Informix、DB2和Sybase等各大数据库厂家都提供对XA的支持。XA协议采用两阶段提交方式来管理分布式事务。XA接口提供资源管理器与事务管理器之间进行通信的标准接口。XA协议包括两套函数,以xa_开头的及以ax_开头的。
以下的函数使事务管理器可以对资源管理器进行的操作:
1)xa_open,xa_close:建立和关闭与资源管理器的连接。
2)xa_start,xa_end:开始和结束一个本地事务。
3)xa_prepare,xa_commit,xa_rollback:预提交、提交和回滚一个本地事务。
4)xa_recover:回滚一个已进行预提交的事务。
5)ax_开头的函数使资源管理器可以动态地在事务管理器中进行注册,并可以对XID(TRANSACTION IDS)进行操作。
6)ax_reg,ax_unreg;允许一个资源管理器在一个TMS(TRANSACTION MANAGER SERVER)中动态注册或撤消注册。
l 内置的JMS适配器XA-capable JMS队列连接器
注释:JMS:jms即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
l 通过XA API兼容第三方适配器
l 更好的整合您的项目
l 集成Hibernate
如何使用Atomikos TransactionsEssentials
Atomikos TransactionsEssentials 是一个可靠的库,可以加入到您的Java应用程序,也就是说为了使用这个产品,您必须添加一些jar文件(包括在dist和lib文件夹下)到您的应用程序或者应用程序服务器。
请注意:Atomikos TransactionsEssentials是一个非常快速的嵌入式事务管理器,这就意味着,您不需要另外启动一个单独的事务管理器进程(不要查找任何的bin文件夹)。相反,您的应用服务器将有它自己的intra-VM事务管理器。
配置需求:至少Java1.5 jdk,并且最少128M的内存
性能优化:尽管这个软件有着很大的优势,但是想要更好的发挥其作用,可以按以下的方法优化:
l 更高的内存,意味着更高的吞吐量(每秒的事务数目)
l 使连接池尽可能的大
l 一旦你不需要的连接请马上关闭它们。不要把你的应用程序放在缓存里,让内部连接池为你做这些,这将促使更高效的连接使用
l 不要让活动的事务闲置:终止所有情况下的事务,尤其是在异常报错情况下的事务。这将减少数据库的锁定时间,并且最大效率的处理启用的使用。
如果想获取这些细节的更多信息,也要参阅文档说明部分。
值得注意的是,在我们所有的压力测试中,Atomikos TransactionsEssentials比J2EE的web容器更高效的吞吐量。这些测量值包括日志记录的高效的事务状态,同样,在我们所有的测量中,包括XA和non-XA,高效的效率是一样的。
在J2SE中使用Atomikos Transactions Essentials,只需要按以下步骤
  1. 将idst和lib中的jar包全部放入的项目中
  2. 创建或者自定义你应用的transactions.properties(或者jta.properties)文件(事务管理器的配置),然后将它放入到classpath中,安装文件夹中包涵一个实例文件;在properties文件中注释(#)后面的是默认值,取消一行并且改变默认值。

相关地址:

https://www.atomikos.com/Main/SupportOverview

下面是, 另外的 相关配置, 可以参考一下

注释:XA:XA协议由Tuxedo首先提出的,并交给X/Open组织,作为资源管理器(数据库)与事务管理器的接口标准。目前,Oracle、Informix、DB2和Sybase等各大数据库厂家都提供对XA的支持。XA协议采用两阶段提交方式来管理分布式事务。XA接口提供资源管理器与事务管理器之间进行通信的标准接口。XA协议包括两套函数,以xa_开头的及以ax_开头的。

atomikos配合spring的使用方法:1、依赖包
Atomikos的:
transactions-jdbc
transactions-jta
transactions-api
transactions
atomikos-utils
还有一个不要忘了,是jta的包。
用maven要简单一点,只需要加入两个依赖:

Xml代码
  1. <dependency>
  2.     <groupId>com.atomikos</groupId>
  3.     <artifactId>transactions-jdbc</artifactId>
  4.     <version>3.7.0</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>javax.transaction</groupId>
  8.     <artifactId>jta</artifactId>
  9.     <version>1.1</version>
  10. </dependency>

2、配置数据源
这一步是比较重要的。要用AtomikosDataSourceBean,而不是以前用的连接池如dbcp。最好也用XA(这东西我还不太懂),注意jdbc的链接地址和登陆账号与普通连接池的配置的格式不一样。下面是一个mysql数据库的配置举例:

Xml代码
  1. <bean id="dataSource1" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close">
  2.     <property name="uniqueResourceName" value="ds1"/>
  3.     <property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"/>
  4.     <property name="xaProperties">
  5.         <props>
  6.             <prop key="url">jdbc:mysql://localhost/test</prop>
  7.             <prop key="user">test</prop>
  8.             <prop key="password">test</prop>
  9.         </props>
  10.     </property>
  11.     <property name="minPoolSize" value="10" />
  12.     <property name="maxPoolSize" value="100" />
  13.     <property name="borrowConnectionTimeout" value="30" />
  14.     <property name="testQuery" value="select 1" />
  15.     <property name="maintenanceInterval" value="60" />
  16. </bean>

再来一个sybase的配置举例:

  1. <bean id="dataSource2" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close">
  2.     <property name="uniqueResourceName" value="ds2"/>
  3.     <property name="xaDataSourceClassName" value="com.sybase.jdbc3.jdbc.SybXADataSource"/>
  4.     <property name="xaProperties">
  5.         <props>
  6.             <prop key="serverName">192.168.1.10</prop>
  7.                         <prop key="portNumber">2638</prop>
  8.                         <prop key="databaseName">test</prop>
  9.             <prop key="user">test</prop>
  10.             <prop key="password">test</prop>
  11.         </props>
  12.     </property>
  13.     <property name="minPoolSize" value="10" />
  14.     <property name="maxPoolSize" value="100" />
  15.     <property name="borrowConnectionTimeout" value="30" />
  16.     <property name="testQuery" value="select 1" />
  17.     <property name="maintenanceInterval" value="60" />
  18. </bean>

3、使用数据源
这一步与平时好像没什么不一样。我做例子的时候是用mybatis,配置如下:

Xml代码
  1. <bean id="sqlSessionFactory1" class="org.mybatis.spring.SqlSessionFactoryBean">
  2.     <property name="dataSource" ref="dataSource1"/>
  3. </bean>
  4. <bean id="sqlSessionFactory2" class="org.mybatis.spring.SqlSessionFactoryBean">
  5.     <property name="dataSource" ref="dataSource2"/>
  6. </bean>

当然,mybatis还要配置一下映射文件的自动扫描,这里与atomikos无关:

  1. <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
  2.     <property name="basePackage" value="xx.xx;" />
  3.     <property name="sqlSessionFactory" ref="sqlSessionFactory1"/>
  4. </bean>
  5. <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
  6.     <property name="basePackage" value="yy.yy;" />
  7.     <property name="sqlSessionFactory" ref="sqlSessionFactory2"/>
  8. </bean>

用spring JdbcTemplate应该与普通使用没什么不同,用hibernate可能会有点不一样,没测试过。

4、配置jta事务管理
这是很关键的一步。原理我不太懂,例子如下:

Xml代码
  1. <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
  2.     <property name="transactionManager">
  3.         <bean class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close">
  4.             <property name="forceShutdown" value="true"/>
  5.         </bean>
  6.     </property>
  7.     <property name="userTransaction">
  8.         <bean class="com.atomikos.icatch.jta.UserTransactionImp"/>
  9.     </property>
  10. </bean>

当然,用spring的声明式事务配置,再加上一行:

  1. <tx:annotation-driven/>

(注意,本来要配置transaction-manager属性,如:<tx:annotation-driven transaction-manager="transactionManager"/>。这里没有配置是因为它的默认值是transactionManager)

5、atomikos的配置文件jta.properties
这个文件一般放在根路径吧,与log4j.properties类似。jta.properties也可命名为transactions.properties。如果不配置这个文件,项目也能启动,因为几乎所有配置项都有默认值。最好还是配置了,详细配置信息请查看:http://www.atomikos.com/Documentation/JtaProperties。

6、不管是用JdbcTemplate、mybatis还是hibernate,应该都可以写代码来测试了。。。 

 

另外, 常用xa事物项目信息

本地事务和分布式事务。

本地事务:只处理单一数据源,比如单个数据库下,事务进行控制。
分布式事务:处理多种异构的数据源, 比如某个业务操作中同时包含JDBC和JMS或者某个操作需要访问多个不同的数据库,在不同数据库之间进行事务控制。

在Java中,分布式事务主要的规范是JTA/XA。其中:JTA是Java的事务管理器规范, XA是工业标准的X/Open CAE规范,可被两阶段提交及回滚的事务资源定义。比如某数据库实现了XA规范,则不管是JTA,还是MSDTC,都可以基于同样的行为对该数据库进行事务处理。

JTA全称为Java Transaction API,顾名思义JTA定义了一组统一的事务编程的接口,这些接口如下:
XAResource :XAResource接口是对实现了X/Open CAE规范的资源管理器 (Resource Manager,数据库就是典型的资源管理器) 的抽象,它由资源适配器 (Resource Apdater) 提供实现。XAResource是支持事务控制的核心。
Transaction:Transaction接口是一个事务实例的抽象,通过它可以控制事务内多个资源的提交或者回滚。二阶段提交过程也是由Transaction接口的实现者来完成的。
TransactionManager:托管模式 (managed mode) 下,TransactionManager接口是被应用服务器调用,以控制事务的边界的。
UserTransaction:非托管模式 (non-managed mode) 下,应用程序可以通过UserTransaction接口控制事务的边界

在tomcat下是没有分布式事务的,可以借助于第三方Jotm和Automikos实现,在spring中分布式事务是通过jta(jotm,atomikos)来进行实现 。即:通过代码的方式来决定是否是分布式事务。

注:推荐用服务器自己的数据源(也就是 lookup JNDI),这样的话,是不是 XA 事务就由服务器的配置来定制,代码就不需要任何配置来决定是不是 XA 了 。事务本身是不是 XA (分布式的)是服务器的事,服务器来管理“资源” (包括数据源,JMS 连接等,一个资源(JDBC连接)如何参与事务是“资源管理器”(驱动程序)的职责,跟程序无关),服务器提供事务管理并作为“事务协调者”来处理多个“资源管理器”(不同的数据库连接)之间的事务一致性。

jotm和automikos网址:
1、http://jotm.objectweb.org/
2、http://www.atomikos.com/Main/TransactionsEssentials

JTA的实现框架有:
GeronimoTM/Jencks  官方文档比较少,不适合学习和维护。
SimpleJTA 没有实现JTS (Java Transaction Service)而且不是活跃的。
Atomikos  是一个另人钦佩的产品。有丰富的文档,而且有很好的支持。
JBossTS  是一个应用在JBOSS服务器上的,肯定是一个成熟的产品,也有好的支持,详细信息可以看这里:http://www.theserverside.com/news  /thread.tss?thread_id=37941

最常见的二个如下:
JOTM
JOTM(Java Open Transaction Manager)是ObjectWeb的一个开源JTA实现,它本身也是开源应用程序服务器JOnAS(Java Open Application Server)的一部分,为其提供JTA分布式事务的功能。
存在的问题:使用中不能自动rollback,无论什么情况都commit。注:spring3开始已经不再支持jotm

Atomikos
大家推荐最多的。和JOTM相比Atomikos Transactions Essentials更加稳定,它原来是商业项目,现在开源了。象MySQL一样卖服务支持的。而且论坛页比较活跃,有问题很快可以解决。

发表评论