SSM框架配置读写分离,一写多读的思路与实践
@(ssm)[数据库|读写分离|一写多读]
代码下载地址github:https://github.com/DearAS/SSM-Databases.git
事务是前提
一个项目中要使用多个数据库首先应该想到的问题是事务的问题,因为事务是建立在数据库的基础上的,如果在事务开启时和事务关闭时操作的数据库不是一个,那么这个事务也就不起作用了。所以要使事务起作用,我们的多个数据库就必须在事务开启之前确定好当前操作要用哪个数据库,所以说事务是前提。
前奏
要实现读写分离需要确定两点事情:
- 根据什么来判断使用读库还是写库。(可以通过方法名、注解、或者事务的readOnly属性等来判断都可以)
- 读库写库怎样进行实时切换。
在JAVA SSM项目中要是写读写分离可以有两种思路:
- 配置两种mybatis的sqlSessionFactory,不同的sqlSessionFactory对应不同的数据源。通过配置这两个sqlSessionFactory的sql映射文件,来使mybatis自动实现读写分离。
- 利用Spring的AOP进行代码的横向切割。在选择声明式事务切入之前就确定好要使用的数据源。
本文通过三种方式来实现ssm项目中的读写分离:推荐使用第三种方式
具体实现
方式一
这种方式比较简单:通过多个配置mybatis的sqlSessionFactory,每个sqlSessionFactory对应不同的事务管理器和事务切入点。
从而达到了不同的事务切入点 使用不同的数据源,通过配置这两个sqlSessionFactory的sql映射文件,来使mybatis自动实现读写分离。
但是这种做法的缺点显而易见:代码改动较大,不易维护,不易扩展。
需要确定的两点:
1.利用sql映射文件xml来判断使用的是读库还是写库。
2.每种不同的sqlSessionFactory对应不同的数据源,实现了数据源的切换。
|
|
这种方式只是一个思路,不建议使用,所以只给出了配置文件。可以利用注释中的内容完成实际开发。
方式二
利用Spring的AOP实现数据源的动态切换。利用方法名上打自定义注解的方式来判断具体要使用的数据源(如:@DataSource(value=”read”)),然后利用如AbstractRoutingDataSource实现数据源的切换。
Spring提供了很多实现这一需求的口,如AbstractRoutingDataSource,只要继承了AbstractRoutingDataSource,然后重写determineCurrentLookupKey方法返回数据源的名字即可动态切换数据源。
在determineCurrentLookupKey()之前就利用切面确定好数据源的名称,可以在在determineCurrentLookupKey()中操作轮询算法实现多读配置。
需要确定的两点:
- 利用方法名上的注解来确定是用读库还是写库。
- 利用Spring提供的重写AbstractRoutingDataSource的determineCurrentLookupKey方法,它返回的是配置文件中的dataSource对应的名称,然后Spring根据不同的名称选取不同的数据源。
本方式主要是
spring-context.xml配置文件详解
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:tx="http://www.springframework.org/schema/tx"xmlns:aop="http://www.springframework.org/schema/aop"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.1.xsdhttp://www.springframework.org/schema/txhttp://www.springframework.org/schema/tx/spring-tx-4.1.xsdhttp://www.springframework.org/schema/aophttp://www.springframework.org/schema/aop/spring-aop-4.1.xsd"><bean id="abstractDataSource" abstract="true" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close"><property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/><!-- 配置获取连接等待超时的时间 --><property name="maxWait" value="60000"/><!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 --><property name="timeBetweenEvictionRunsMillis" value="60000"/><!-- 配置一个连接在池中最小生存的时间,单位是毫秒 --><property name="minEvictableIdleTimeMillis" value="300000"/><property name="validationQuery" value="SELECT 'x'"/><property name="testWhileIdle" value="true"/><property name="testOnBorrow" value="false"/><property name="testOnReturn" value="false"/><!-- 打开PSCache,并且指定每个连接上PSCache的大小 --><property name="poolPreparedStatements" value="true"/><property name="maxPoolPreparedStatementPerConnectionSize" value="20"/><property name="filters" value="config"/><property name="connectionProperties" value="config.decrypt=true" /></bean><bean id="dataSourceRead1" parent="abstractDataSource"><property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/><!-- 基本属性 url、user、password --><property name="url" value="${read1.jdbc.url}"/><property name="username" value="${read1.jdbc.user}"/><property name="password" value="${read1.jdbc.password}"/><!-- 配置初始化大小、最小、最大 --><property name="initialSize" value="${read1.jdbc.initPoolSize}"/><property name="minIdle" value="${read1.jdbc.minPoolSize}"/><property name="maxActive" value="${read1.jdbc.maxPoolSize}"/></bean><bean id="dataSourceRead2" parent="abstractDataSource"><property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/><!-- 基本属性 url、user、password --><property name="url" value="${read2.jdbc.url}"/><property name="username" value="${read2.jdbc.user}"/><property name="password" value="${read2.jdbc.password}"/><!-- 配置初始化大小、最小、最大 --><property name="initialSize" value="${read2.jdbc.initPoolSize}"/><property name="minIdle" value="${read2.jdbc.minPoolSize}"/><property name="maxActive" value="${read2.jdbc.maxPoolSize}"/></bean><bean id="dataSourceWrite" parent="abstractDataSource"><property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/><!-- 基本属性 url、user、password --><property name="url" value="${write.jdbc.url}"/><property name="username" value="${write.jdbc.user}"/><property name="password" value="${write.jdbc.password}"/><!-- 配置初始化大小、最小、最大 --><property name="initialSize" value="${write.jdbc.initPoolSize}"/><property name="minIdle" value="${write.jdbc.minPoolSize}"/><property name="maxActive" value="${write.jdbc.maxPoolSize}"/></bean><bean id="dataSource" class="io.dearas.datasource2.DynamicDataSource"><property name="writeDataSource" ref="dataSourceWrite" /><property name="readDataSources"><list><ref bean="dataSourceRead1" /><ref bean="dataSourceRead2" /></list></property><!--轮询方式 1为轮询,其它为随机--><property name="readDataSourcePollPattern" value="1" /><property name="defaultTargetDataSource" ref="dataSourceWrite"/></bean><tx:annotation-driven transaction-manager="transactionManager"/><bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"><property name="dataSource" ref="dataSource"/></bean><!-- 针对myBatis的配置项 --><!-- 配置sqlSessionFactory --><bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean"><!-- 实例化sqlSessionFactory时需要使用上述配置好的数据源以及SQL映射文件 --><property name="dataSource" ref="dataSource"/><property name="mapperLocations" value="classpath:mapper/*.xml"/></bean><!-- 配置扫描器 --><bean class="org.mybatis.spring.mapper.MapperScannerConfigurer"><!-- 扫描包以及它的子包下的所有映射接口类 --><property name="basePackage" value="com.test.api.dao.inte"/><property name="sqlSessionFactoryBeanName" value="sqlSessionFactory"/></bean><!-- 配置数据库注解aop --><bean id="dynamicDataSourceAspect" class="io.dearas.datasource2.DynamicDataSourceAspect" /><!--配置事务管理,设置所有的读库为read-only="true" 在数据库切换中通过read-only进行判断是否使用读库 --><tx:advice id="txAdvice" transaction-manager="transactionManager"><tx:attributes><tx:method name="save*" propagation="REQUIRED" /><tx:method name="add*" propagation="REQUIRED" /><tx:method name="create*" propagation="REQUIRED" /><tx:method name="insert*" propagation="REQUIRED" /><tx:method name="update*" propagation="REQUIRED" /><tx:method name="merge*" propagation="REQUIRED" /><tx:method name="del*" propagation="REQUIRED" /><tx:method name="remove*" propagation="REQUIRED" /><tx:method name="query*" read-only="true"/><tx:method name="use*" read-only="true"/><tx:method name="get*" read-only="true" /><tx:method name="count*" read-only="true" /><tx:method name="find*" read-only="true" /><tx:method name="list*" read-only="true" /><tx:method name="*" propagation="REQUIRED"/></tx:attributes></tx:advice><aop:config expose-proxy="true"><!-- 只对业务逻辑层实施事务 --><aop:pointcut id="txPointcut" expression="execution(* io.dearas.servie.*(..))" /><aop:advisor advice-ref="txAdvice" pointcut-ref="txPointcut"/><!-- 通过AOP切面实现读/写库选择 order 用于定义与事务的切割顺序,order越小 越先被执行。保证在事务之前就已经进行了数据源的切换--><aop:aspect id="c" order="-2147483648" ref="dynamicDataSourceAspect"><aop:before pointcut-ref="txPointcut" method="before"/><aop:after pointcut-ref="txPointcut" method="after"/></aop:aspect></aop:config></beans>动态数据源实现
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697package io.dearas.datasource2;import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.concurrent.ThreadLocalRandom;import java.util.concurrent.atomic.AtomicLong;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/*** 动态数据源实现读写分离*/public class DynamicDataSource extends AbstractRoutingDataSource {private Object writeDataSource; //写数据源private List<Object> readDataSources; //多个读数据源private int readDataSourceSize; //读数据源个数private int readDataSourcePollPattern = 0; //获取读数据源方式,0:随机,1:轮询private AtomicLong counter = new AtomicLong(0);private static final Long MAX_POOL = Long.MAX_VALUE;private final Lock lock = new ReentrantLock();public void afterPropertiesSet() {if (this.writeDataSource == null) {throw new IllegalArgumentException("Property 'writeDataSource' is required");}setDefaultTargetDataSource(writeDataSource);Map<Object, Object> targetDataSources = new HashMap<>();targetDataSources.put(DynamicDataSourceGlobal.WRITE.name(), writeDataSource);if (this.readDataSources == null) {readDataSourceSize = 0;} else {for(int i=0; i<readDataSources.size(); i++) {targetDataSources.put(DynamicDataSourceGlobal.READ.name() + i, readDataSources.get(i));}readDataSourceSize = readDataSources.size();}setTargetDataSources(targetDataSources);super.afterPropertiesSet();}protected Object determineCurrentLookupKey() {DynamicDataSourceGlobal dynamicDataSourceGlobal = DynamicDataSourceHolder.getDataSource();if(dynamicDataSourceGlobal == null|| dynamicDataSourceGlobal == DynamicDataSourceGlobal.WRITE|| readDataSourceSize <= 0) {return DynamicDataSourceGlobal.WRITE.name();}int index = 1;if(readDataSourcePollPattern == 1) {//轮询方式long currValue = counter.incrementAndGet();if((currValue + 1) >= MAX_POOL) {try {lock.lock();if((currValue + 1) >= MAX_POOL) {counter.set(0);}} finally {lock.unlock();}}index = (int) (currValue % readDataSourceSize);} else {//随机方式index = ThreadLocalRandom.current().nextInt(0, readDataSourceSize);}return dynamicDataSourceGlobal.name() + index;}public void setWriteDataSource(Object writeDataSource) {this.writeDataSource = writeDataSource;}public void setReadDataSources(List<Object> readDataSources) {this.readDataSources = readDataSources;}public void setReadDataSourcePollPattern(int readDataSourcePollPattern) {this.readDataSourcePollPattern = readDataSourcePollPattern;}}spring AOP切面
1234567891011121314151617181920212223242526272829303132333435363738package io.dearas.datasource2;import org.apache.log4j.Logger;import org.aspectj.lang.JoinPoint;import org.aspectj.lang.reflect.MethodSignature;import java.lang.reflect.Method;/*** 定义选择数据源切面*/public class DynamicDataSourceAspect {private static final Logger logger = Logger.getLogger(DynamicDataSourceAspect.class);public void pointCut(){};public void before(JoinPoint point){Object target = point.getTarget();String methodName = point.getSignature().getName();Class<?>[] clazz = target.getClass().getInterfaces();Class<?>[] parameterTypes = ((MethodSignature) point.getSignature()).getMethod().getParameterTypes();try {Method method = clazz[0].getMethod(methodName, parameterTypes);if (method != null && method.isAnnotationPresent(DataSource.class)) {DataSource data = method.getAnnotation(DataSource.class);DynamicDataSourceHolder.putDataSource(data.value());}} catch (Exception e) {logger.error(String.format("Choose DataSource error, method:%s, msg:%s", methodName, e.getMessage()));}}public void after(JoinPoint point) {DynamicDataSourceHolder.clearDataSource();}}一些其它的辅助类
1234567891011121314package io.dearas.datasource2;import java.lang.annotation.ElementType;import java.lang.annotation.Retention;import java.lang.annotation.RetentionPolicy;import java.lang.annotation.Target;(RetentionPolicy.RUNTIME)(ElementType.METHOD)public DataSource {public DynamicDataSourceGlobal value() default DynamicDataSourceGlobal.READ;}
|
|
|
|
轮询算法的讲解
方式三
利用Spring的AOP实现数据源的动态切换。
除了可以利用如AbstractRoutingDataSource之外还可以利用AbstractDataSource来重写getConnection()方法,来实现数据源的读写分离和动态切换。根据事务的配置,如果不是readOnly方法则使用写库,如果是readOnly的方法,再进行后续的逻辑判断。
如配置之前是写操作,则本次也使用写库进行读取。
需要确定的两点:
- 根据事务的readOnly判断使用读库还是写库。
- 继承AbstarctDataSource重写getConnection()方法,通过返回不同的连接来进行数据源的切换。
此方式实现过程可能比较复杂,整体流程分为以下步骤:
1、spring容器加载完成之后,把依赖注入的map中的key添加到String[] readDataSourceNames这个数组中,代表读库的名称。map中的value添加到DataSource[] readDataSources这个数组中,代表读库的具体数据源。
2、spring容器中每个bean对象被初始化完成之后(因为bean被初始化完成之后,AOP切面已经切入进去,所以可以获取到切面中的内容):如果这个bean对象是有事务处理的则进行判断事务的类型来决定具体使用读库还是写库。
如果是读库则把方法名添加到map中,key为方法名,value为true 表示这个方法使用读库,如果是写库则使用false。
这个步骤最终把所有的方法名及使用的数据源类型都添加到了一个map中。当真正调用的时候根据方法名在map中查找,最后确定要使用的数据源。
3、当真正调用方法的时候AOP中的切面做的事情:根据当前的方法名判断要使用的是读库还是写库,进行一个标记。
4、因为确定数据源切入点要比事务的切入先执行(通过配置文件中的order属性进行控制),所以在步骤3执行完毕之后,开启事务等要获取数据库连接就会调用getConnection()方法。
因为我们又对getConnection()方法进行了重写,所以在重写getConnection()中使用具体的数据源连接池获取具体的connection。从而达到了数据库连接的切换。还可以在getConnection中用轮询算法进行多读库的使用。非常灵活。
spring-context.xml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"xmlns:context="http://www.springframework.org/schema/context"xmlns:jdbc="http://www.springframework.org/schema/jdbc"xmlns:mybatis-spring="http://mybatis.org/schema/mybatis-spring"xmlns:tx="http://www.springframework.org/schema/tx"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsdhttp://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.2.xsdhttp://mybatis.org/schema/mybatis-spring http://mybatis.org/schema/mybatis-spring-1.2.xsdhttp://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd"><context:property-placeholderlocation="classpath:dbconfig.properties" /><!--此配置为配置数据库读写分离 实现多读库--><!-- 数据源 读 --><bean id="readDataSource1" class="com.alibaba.druid.pool.DruidDataSource"init-method="init" destroy-method="close"><property name="url" value="${jdbc_url}" /><property name="username" value="${jdbc_username}" /><property name="password" value="${jdbc_password}" /><!-- 初始化连接大小 --><property name="initialSize" value="2" /><!-- 连接池最大使用连接数量 --><property name="maxActive" value="1000" /><!-- 连接池最小空闲 --><property name="minIdle" value="2" /><!-- 获取连接最大等待时间 --><property name="maxWait" value="60000" /><!-- <property name="poolPreparedStatements" value="true" /> <propertyname="maxPoolPreparedStatementPerConnectionSize" value="33" /> --><property name="validationQuery" value="${validationQuery}" /><property name="testOnBorrow" value="false" /><property name="testOnReturn" value="false" /><property name="testWhileIdle" value="true" /><!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 --><property name="timeBetweenEvictionRunsMillis" value="60000" /><!-- 配置一个连接在池中最小生存的时间,单位是毫秒 --><property name="minEvictableIdleTimeMillis" value="25200000" /><!-- 打开removeAbandoned功能 --><property name="removeAbandoned" value="true" /><!-- 1800秒,也就是30分钟 --><property name="removeAbandonedTimeout" value="1800" /><!-- 关闭abanded连接时输出错误日志 --><property name="logAbandoned" value="true" /><!-- 监控数据库 --><!-- <property name="filters" value="stat" /> --><property name="filters" value="mergeStat" /></bean><!-- 数据源 读 --><bean id="readDataSource2" class="com.alibaba.druid.pool.DruidDataSource"init-method="init" destroy-method="close"><property name="url" value="${jdbc_url}" /><property name="username" value="${jdbc_username}" /><property name="password" value="${jdbc_password}" /><!-- 初始化连接大小 --><property name="initialSize" value="2" /><!-- 连接池最大使用连接数量 --><property name="maxActive" value="1000" /><!-- 连接池最小空闲 --><property name="minIdle" value="2" /><!-- 获取连接最大等待时间 --><property name="maxWait" value="60000" /><!-- <property name="poolPreparedStatements" value="true" /> <propertyname="maxPoolPreparedStatementPerConnectionSize" value="33" /> --><property name="validationQuery" value="${validationQuery}" /><property name="testOnBorrow" value="false" /><property name="testOnReturn" value="false" /><property name="testWhileIdle" value="true" /><!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 --><property name="timeBetweenEvictionRunsMillis" value="60000" /><!-- 配置一个连接在池中最小生存的时间,单位是毫秒 --><property name="minEvictableIdleTimeMillis" value="25200000" /><!-- 打开removeAbandoned功能 --><property name="removeAbandoned" value="true" /><!-- 1800秒,也就是30分钟 --><property name="removeAbandonedTimeout" value="1800" /><!-- 关闭abanded连接时输出错误日志 --><property name="logAbandoned" value="true" /><!-- 监控数据库 --><!-- <property name="filters" value="stat" /> --><property name="filters" value="mergeStat" /></bean><!-- 数据源 写 --><bean id="writeDataSource" class="com.alibaba.druid.pool.DruidDataSource"init-method="init" destroy-method="close"><property name="url" value="${jdbc_url_writer}" /><property name="username" value="${jdbc_username_writer}" /><property name="password" value="${jdbc_password_writer}" /><!-- 初始化连接大小 --><property name="initialSize" value="2" /><!-- 连接池最大使用连接数量 --><property name="maxActive" value="1000" /><!-- 连接池最小空闲 --><property name="minIdle" value="2" /><!-- 获取连接最大等待时间 --><property name="maxWait" value="60000" /><!-- <property name="poolPreparedStatements" value="true" /> <propertyname="maxPoolPreparedStatementPerConnectionSize" value="33" /> --><property name="validationQuery" value="${validationQuery}" /><property name="testOnBorrow" value="false" /><property name="testOnReturn" value="false" /><property name="testWhileIdle" value="true" /><!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 --><property name="timeBetweenEvictionRunsMillis" value="60000" /><!-- 配置一个连接在池中最小生存的时间,单位是毫秒 --><property name="minEvictableIdleTimeMillis" value="25200000" /><!-- 打开removeAbandoned功能 --><property name="removeAbandoned" value="true" /><!-- 1800秒,也就是30分钟 --><property name="removeAbandonedTimeout" value="1800" /><!-- 关闭abanded连接时输出错误日志 --><property name="logAbandoned" value="true" /><!-- 监控数据库 --><!-- <property name="filters" value="stat" /> --><property name="filters" value="mergeStat" /></bean><!--写库配置 在写库中指定读库的列表,然后从库列表默认使用轮询的方式来使用读库 --><bean id="readWriteDataSource" class="io.dearas.datasource.ReadWriteDataSource"><property name="writeDataSource" ref="writeDataSource"/><property name="readDataSourceMap"><map><entry key="readDataSource1" value-ref="readDataSource1"/><entry key="readDataSource2" value-ref="readDataSource2"/></map></property><!--轮询方式 1为轮询,其它为随机--><property name="readDataSourcePollPattern" value="1" /></bean><!--配置事务管理,设置所有的读库为read-only="true" 在数据库切换中通过read-only进行判断是否使用读库 --><tx:advice id="txAdvice" transaction-manager="txManager"><tx:attributes><tx:method name="save*" propagation="REQUIRED" /><tx:method name="add*" propagation="REQUIRED" /><tx:method name="create*" propagation="REQUIRED" /><tx:method name="insert*" propagation="REQUIRED" /><tx:method name="update*" propagation="REQUIRED" /><tx:method name="merge*" propagation="REQUIRED" /><tx:method name="del*" propagation="REQUIRED" /><tx:method name="remove*" propagation="REQUIRED" /><tx:method name="query*" read-only="true"/><tx:method name="use*" read-only="true"/><tx:method name="get*" read-only="true" /><tx:method name="count*" read-only="true" /><tx:method name="find*" read-only="true" /><tx:method name="list*" read-only="true" /><tx:method name="*" propagation="REQUIRED"/></tx:attributes></tx:advice><!--Spring事务管理器 --><bean id="txManager"class="org.springframework.jdbc.datasource.DataSourceTransactionManager"><property name="dataSource" ref="readWriteDataSource" /></bean><!--读/写 库动态分配管理者 --><bean id="readWriteDataSourceTransactionProcessor" class="io.dearas.datasource.ReadWriteDataSourceProcessor"><property name="forceChoiceReadWhenWrite" value="false"/></bean><aop:config expose-proxy="true"><!-- 只对业务逻辑层实施事务 --><aop:pointcut id="txPointcut" expression="execution(* io.dearas.*.*(..))" /><aop:advisor advice-ref="txAdvice" pointcut-ref="txPointcut"/><!-- 通过AOP切面实现读/写库选择 order 用于定义与事务的切割顺序,order越小 越先被执行。保证在事务之前就已经进行了数据源的切换--><aop:aspect order="-2147483648" ref="readWriteDataSourceTransactionProcessor"><aop:around pointcut-ref="txPointcut" method="determineReadOrWriteDB"/></aop:aspect></aop:config><bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"><property name="dataSource" ref="readWriteDataSource" /></bean></beans>动态数据源
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136package io.dearas.datasource;import java.sql.Connection;import java.sql.SQLException;import java.util.Map;import java.util.Map.Entry;import java.util.concurrent.atomic.AtomicInteger;import javax.sql.DataSource;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.InitializingBean;import org.springframework.jdbc.datasource.AbstractDataSource;import org.springframework.util.CollectionUtils;/**** 读/写动态选择数据库实现** 默认按顺序轮询使用读库* 默认选择写库** 一写多读、当写时默认读操作到写库、当写时强制读操作到读库**/public class ReadWriteDataSource extends AbstractDataSource implements InitializingBean {private static final Logger log = LoggerFactory.getLogger(ReadWriteDataSource.class);private DataSource writeDataSource; //DI写库private Map<String, DataSource> readDataSourceMap; //DI读库map//容器加载完毕之后初始化这两个数组方便以后使用private String[] readDataSourceNames;private DataSource[] readDataSources;private int readDataSourceCount; //读库的数量private int readDataSourcePollPattern = 0; //获取读数据源方式,0:随机,1:轮询public void setReadDataSourcePollPattern(int readDataSourcePollPattern) {this.readDataSourcePollPattern = readDataSourcePollPattern;}private AtomicInteger counter = new AtomicInteger(1);/*** 设置读库(name, DataSource)* @param readDataSourceMap*/public void setReadDataSourceMap(Map<String, DataSource> readDataSourceMap) {this.readDataSourceMap = readDataSourceMap;}public void setWriteDataSource(DataSource writeDataSource) {this.writeDataSource = writeDataSource;}/*** 容器初始化之后:* 因为xml配置文件中只注入了readDataSourceMap这个Map和写库的数据源writeDataSource,这里把map中的元素分解。* map中的key添加到String[] readDataSourceNames这个数组中,代表读库的名称。* map中的value添加到DataSource[] readDataSources这个数组中,代表读库的具体数据源。* @throws Exception*/public void afterPropertiesSet() throws Exception {System.out.println("Spring容器加载完毕。");if(writeDataSource == null) {throw new IllegalArgumentException("property 'writeDataSource' is required");}if(CollectionUtils.isEmpty(readDataSourceMap)) {throw new IllegalArgumentException("property 'readDataSourceMap' is required");}readDataSourceCount = readDataSourceMap.size();readDataSources = new DataSource[readDataSourceCount];readDataSourceNames = new String[readDataSourceCount];int i = 0;for(Entry<String, DataSource> e : readDataSourceMap.entrySet()) {readDataSources[i] = e.getValue();readDataSourceNames[i] = e.getKey();i++;}}private DataSource determineDataSource() {if(ReadWriteDataSourceDecision.isChoiceWrite()) {log.debug("current determine write datasource");return writeDataSource;}if(ReadWriteDataSourceDecision.isChoiceNone()) {log.debug("no choice read/write, default determine write datasource");return writeDataSource;}return determineReadDataSource();}private DataSource determineReadDataSource() {//按照顺序选择读库//TODO 算法改进int index;if(readDataSourcePollPattern == 1){index = counter.incrementAndGet() % readDataSourceCount;if(index < 0) {index = - index;}}else{// 随机方式/*** ThreadLocalRandom是JDK 7之后提供并发产生随机数,能够解决多个线程发生的竞争争夺。ThreadLocalRandom不是直接用new实例化,* 而是第一次使用其静态方法current()。从Math.random()改变到ThreadLocalRandom有如下好处:* 我们不再有从多个线程访问同一个随机数生成器实例的争夺。取代以前每个随机变量实例化一个随机数生成器实例,我们可以每个线程实例化一个。*/index = ThreadLocalRandom.current().nextInt(0, readDataSourceCount);}String dataSourceName = readDataSourceNames[index];log.debug("current determine read datasource : {} and index is : {}", dataSourceName,index);return readDataSources[index];}public Connection getConnection() throws SQLException {return determineDataSource().getConnection();}public Connection getConnection(String username, String password) throws SQLException {return determineDataSource().getConnection(username, password);}}Spring AOP切面
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172package io.dearas.datasource;import java.lang.reflect.Field;import java.util.HashMap;import java.util.Map;import java.util.Map.Entry;import org.aspectj.lang.ProceedingJoinPoint;import org.aspectj.lang.annotation.Around;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.BeansException;import org.springframework.beans.factory.config.BeanPostProcessor;import org.springframework.core.NestedRuntimeException;import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;import org.springframework.transaction.annotation.Propagation;import org.springframework.transaction.interceptor.NameMatchTransactionAttributeSource;import org.springframework.transaction.interceptor.RuleBasedTransactionAttribute;import org.springframework.transaction.interceptor.TransactionAttribute;import org.springframework.transaction.interceptor.TransactionInterceptor;import org.springframework.util.PatternMatchUtils;import org.springframework.util.ReflectionUtils;/*** 这个类有两个功能:* 1.实现BeanPostProcessor接口,实例化每个bean对象之后,把有事务控制的bean对象摘取出来,并对它的方法进行判断,最终添加到一个map中。map的key为方法名,value为是否使用读库* 2.determineReadOrWriteDB方法作为AOP的切面,对使用读库还是写库进行标记。* 扩展点:* 1.通过<property name="forceChoiceReadWhenWrite" value="false"/> forceChoiceReadWhenWrite来配置在前一个操作时写操作时,下一个操作使用读库还是写库。(为了防止主从复制不及时现象)* 2.通过 <tx:method name="query*" read-only="true"/> 事务的read-only属性来判断是使用读库还是写库。* 这些判断操作都是在postProcessAfterInitialization()方法中完成的。*/public class ReadWriteDataSourceProcessor implements BeanPostProcessor {private static final Logger log = LoggerFactory.getLogger(ReadWriteDataSourceProcessor.class);private boolean forceChoiceReadWhenWrite = false;private Map<String, Boolean> readMethodMap = new HashMap<String, Boolean>();/*** 当之前操作是写的时候,是否强制从从库读* 默认(false) 当之前操作是写,默认强制从写库读* @param forceReadOnWrite*/public void setForceChoiceReadWhenWrite(boolean forceChoiceReadWhenWrite) {this.forceChoiceReadWhenWrite = forceChoiceReadWhenWrite;}public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {System.out.println("AfterInitialization,实例化之后的动作。"+beanName);//1.如果这个bean不是NameMatchTransactionAttributeSource类型,则跳过。表示是有事务的bean对象if(!(bean instanceof NameMatchTransactionAttributeSource)) {return bean;}try {//2. 获取事务的相关属性进行判断是否添加到readMethodMap中,添加到根据map的value是false和true判断是不是读库。NameMatchTransactionAttributeSource transactionAttributeSource = (NameMatchTransactionAttributeSource)bean;Field nameMapField = ReflectionUtils.findField(NameMatchTransactionAttributeSource.class, "nameMap");nameMapField.setAccessible(true);Map<String, TransactionAttribute> nameMap = (Map<String, TransactionAttribute>) nameMapField.get(transactionAttributeSource);for(Entry<String, TransactionAttribute> entry : nameMap.entrySet()) {RuleBasedTransactionAttribute attr = (RuleBasedTransactionAttribute)entry.getValue();//仅对read-only的处理if(!attr.isReadOnly()) {continue;}String methodName = entry.getKey();Boolean isForceChoiceRead = Boolean.FALSE;if(forceChoiceReadWhenWrite) {//不管之前操作是写,默认强制从读库读 (设置为NOT_SUPPORTED即可)//NOT_SUPPORTED会挂起之前的事务attr.setPropagationBehavior(Propagation.NOT_SUPPORTED.value());isForceChoiceRead = Boolean.TRUE;} else {//否则 设置为SUPPORTS(这样可以参与到写事务)attr.setPropagationBehavior(Propagation.SUPPORTS.value());}log.debug("read/write transaction process method:{} force read:{}", methodName, isForceChoiceRead);readMethodMap.put(methodName, isForceChoiceRead);}} catch (Exception e) {throw new ReadWriteDataSourceTransactionException("process read/write transaction error", e);}return bean;}public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {System.out.println("BeforeInitialization,实例化之前的动作。"+beanName);return bean;}private class ReadWriteDataSourceTransactionException extends NestedRuntimeException {public ReadWriteDataSourceTransactionException(String message, Throwable cause) {super(message, cause);}}/*** 数据源切面* @param pjp* @return* @throws Throwable*/public Object determineReadOrWriteDB(ProceedingJoinPoint pjp) throws Throwable {//pjp.getSignature().getName() 返回的是切入点的方法名if (isChoiceReadDB(pjp.getSignature().getName())) {ReadWriteDataSourceDecision.markRead(); //标记此方法为读库} else {ReadWriteDataSourceDecision.markWrite(); //标记此方法为写库}try {return pjp.proceed();} finally {ReadWriteDataSourceDecision.reset();}}/**** 判断是不是读库* @param methodName* @return*/private boolean isChoiceReadDB(String methodName) {String bestNameMatch = null;for (String mappedName : this.readMethodMap.keySet()) {if (isMatch(methodName, mappedName)) {bestNameMatch = mappedName;break;}}Boolean isForceChoiceRead = readMethodMap.get(bestNameMatch);//表示强制选择 读 库if(isForceChoiceRead == Boolean.TRUE) {return true;}//如果之前选择了写库 现在还选择 写库if(ReadWriteDataSourceDecision.isChoiceWrite()) {return false;}//表示应该选择读库if(isForceChoiceRead != null) {return true;}//默认选择 写库return false;}protected boolean isMatch(String methodName, String mappedName) {return PatternMatchUtils.simpleMatch(mappedName, methodName);}}辅助类
1234567891011121314151617181920212223242526272829303132333435363738394041424344package io.dearas.datasource;/*** 读/写动态数据库 决策者* 根据DataSourceType是write/read 来决定是使用读/写数据库* 通过ThreadLocal绑定实现选择功能**/public class ReadWriteDataSourceDecision {public enum DataSourceType {write, read;}private static final ThreadLocal<DataSourceType> holder = new ThreadLocal<DataSourceType>();public static void markWrite() {holder.set(DataSourceType.write);}public static void markRead() {holder.set(DataSourceType.read);}public static void reset() {holder.set(null);}public static boolean isChoiceNone() {return null == holder.get();}public static boolean isChoiceWrite() {return DataSourceType.write == holder.get();}public static boolean isChoiceRead() {return DataSourceType.read == holder.get();}}
可以在方式三中同样加入轮询算法的判断,使用不同的算法实现多读库,灵活多变。