knight_ka | 生活及学习笔记

SSM框架配置读写分离,一写多读的思路与实践。

SSM框架配置读写分离,一写多读的思路与实践

@(ssm)[数据库|读写分离|一写多读]

代码下载地址github:https://github.com/DearAS/SSM-Databases.git

事务是前提
一个项目中要使用多个数据库首先应该想到的问题是事务的问题,因为事务是建立在数据库的基础上的,如果在事务开启时和事务关闭时操作的数据库不是一个,那么这个事务也就不起作用了。所以要使事务起作用,我们的多个数据库就必须在事务开启之前确定好当前操作要用哪个数据库,所以说事务是前提。

前奏

要实现读写分离需要确定两点事情:

  • 根据什么来判断使用读库还是写库。(可以通过方法名、注解、或者事务的readOnly属性等来判断都可以)
  • 读库写库怎样进行实时切换。

在JAVA SSM项目中要是写读写分离可以有两种思路:

  1. 配置两种mybatis的sqlSessionFactory,不同的sqlSessionFactory对应不同的数据源。通过配置这两个sqlSessionFactory的sql映射文件,来使mybatis自动实现读写分离。
  2. 利用Spring的AOP进行代码的横向切割。在选择声明式事务切入之前就确定好要使用的数据源。

本文通过三种方式来实现ssm项目中的读写分离:推荐使用第三种方式

具体实现

方式一

这种方式比较简单:通过多个配置mybatis的sqlSessionFactory,每个sqlSessionFactory对应不同的事务管理器和事务切入点。
从而达到了不同的事务切入点 使用不同的数据源,通过配置这两个sqlSessionFactory的sql映射文件,来使mybatis自动实现读写分离。

但是这种做法的缺点显而易见:代码改动较大,不易维护,不易扩展。

需要确定的两点:
1.利用sql映射文件xml来判断使用的是读库还是写库。
2.每种不同的sqlSessionFactory对应不同的数据源,实现了数据源的切换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jdbc="http://www.springframework.org/schema/jdbc"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd">
<bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>classpath:init-config.properties</value>
</property>
</bean>
<!-- enable component scanning (beware that this does not enable mapper scanning!) -->
<context:component-scan base-package="io.dearas.mybatis" />
<!-- enable autowire -->
<context:annotation-config />
<bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource"
init-method="init" destroy-method="close">
<!-- 基本属性 url、user、password -->
<property name="url" value="${dataSource.url}" />
<property name="username" value="${dataSource.username}" />
<property name="password" value="${dataSource.password}" />
<property name="connectionProperties" value="${dataSource.driver}"></property>
<!-- 配置初始化大小、最小、最大 -->
<property name="initialSize" value="1" />
<property name="minIdle" value="1" />
<property name="maxActive" value="20" />
<!-- 配置获取连接等待超时的时间 -->
<property name="maxWait" value="60000" />
<!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
<property name="timeBetweenEvictionRunsMillis" value="60000" />
<!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
<property name="minEvictableIdleTimeMillis" value="300000" />
<property name="validationQuery" value="SELECT 'x'" />
<property name="testWhileIdle" value="true" />
<property name="testOnBorrow" value="false" />
<property name="testOnReturn" value="false" />
<!-- 打开PSCache,并且指定每个连接上PSCache的大小 -->
<property name="poolPreparedStatements" value="true" />
<property name="maxPoolPreparedStatementPerConnectionSize"
value="20" />
<!-- 配置监控统计拦截的filters -->
<property name="filters" value="stat" />
</bean>
<!-- define the SqlSessionFactory -->
<bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="dataSource" />
<property name="mapperLocations" value="classpath:mapper/write/*.xml"/>
</bean>
<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<property name="sqlSessionFactoryBeanName" value="sqlSessionFactory"/>
<property name="basePackage" value="org.zhuc.mybatis.mapper" />
</bean>
<!-- transaction manager, use JtaTransactionManager for global tx -->
<bean id="transactionManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource" />
<qualifier value="isap" />
</bean>
<!-- 全注解方式 需加上@Transactional -->
<tx:annotation-driven transaction-manager="transactionManager" />
<!-- 事务控制的业务方法配 -->
<!--
<tx:advice id="txAdvice" transaction-manager="transactionManager">
<tx:attributes>
<tx:method name="get*" read-only="true" />
<tx:method name="page*" read-only="true" />
<tx:method name="list*" read-only="true" />
<tx:method name="*" />
</tx:attributes>
</tx:advice>
-->
<!-- 事务控制拦截 -->
<!--
<aop:config proxy-target-class="true">
<aop:advisor pointcut="execution(* id.dearas.*.*(..))"
advice-ref="txAdvice" />
</aop:config>
-->
<!-- =================================================================== -->
<!-- 数据源2 -->
<bean id="dataSource2" class="com.alibaba.druid.pool.DruidDataSource"
init-method="init" destroy-method="close">
<!-- 基本属性 url、user、password -->
<property name="url" value="${dataSource2.url}" />
<property name="username" value="${dataSource2.username}" />
<property name="password" value="${dataSource2.password}" />
<property name="connectionProperties" value="${dataSource2.driver}"></property>
<!-- 配置初始化大小、最小、最大 -->
<property name="initialSize" value="1" />
<property name="minIdle" value="1" />
<property name="maxActive" value="20" />
<!-- 配置获取连接等待超时的时间 -->
<property name="maxWait" value="60000" />
<!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
<property name="timeBetweenEvictionRunsMillis" value="60000" />
<!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
<property name="minEvictableIdleTimeMillis" value="300000" />
<property name="validationQuery" value="SELECT 'x'" />
<property name="testWhileIdle" value="true" />
<property name="testOnBorrow" value="false" />
<property name="testOnReturn" value="false" />
<!-- 打开PSCache,并且指定每个连接上PSCache的大小 -->
<property name="poolPreparedStatements" value="true" />
<property name="maxPoolPreparedStatementPerConnectionSize"
value="20" />
<!-- 配置监控统计拦截的filters -->
<property name="filters" value="stat" />
</bean>
<bean id="sqlSessionFactory2" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="dataSource2" />
<property name="mapperLocations" value="classpath:mapper/read/*.xml"/>
</bean>
<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<property name="sqlSessionFactoryBeanName" value="sqlSessionFactory2"/>
<property name="basePackage" value="org.zhuc.mybatis.mapper2" />
</bean>
<bean id="transactionManager2"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource2" />
<qualifier value="insurance" />
</bean>
<!-- 全注解方式 -->
<tx:annotation-driven transaction-manager="transactionManager2" />
<!--
注解的使用方式:
@Transactional(value = "insurance", rollbackFor = Exception.class)
这里的value指的就是qualifier的value.根据value的不同选择不同的事务管理器。
-->
</beans>

这种方式只是一个思路,不建议使用,所以只给出了配置文件。可以利用注释中的内容完成实际开发。

方式二

利用Spring的AOP实现数据源的动态切换。利用方法名上打自定义注解的方式来判断具体要使用的数据源(如:@DataSource(value=”read”)),然后利用如AbstractRoutingDataSource实现数据源的切换。

Spring提供了很多实现这一需求的口,如AbstractRoutingDataSource,只要继承了AbstractRoutingDataSource,然后重写determineCurrentLookupKey方法返回数据源的名字即可动态切换数据源。

在determineCurrentLookupKey()之前就利用切面确定好数据源的名称,可以在在determineCurrentLookupKey()中操作轮询算法实现多读配置。

需要确定的两点:

  1. 利用方法名上的注解来确定是用读库还是写库。
  2. 利用Spring提供的重写AbstractRoutingDataSource的determineCurrentLookupKey方法,它返回的是配置文件中的dataSource对应的名称,然后Spring根据不同的名称选取不同的数据源。

本方式主要是

  • spring-context.xml配置文件详解

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:aop="http://www.springframework.org/schema/aop"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
    http://www.springframework.org/schema/tx
    http://www.springframework.org/schema/tx/spring-tx-4.1.xsd
    http://www.springframework.org/schema/aop
    http://www.springframework.org/schema/aop/spring-aop-4.1.xsd">
    <bean id="abstractDataSource" abstract="true" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close">
    <property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
    <!-- 配置获取连接等待超时的时间 -->
    <property name="maxWait" value="60000"/>
    <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
    <property name="timeBetweenEvictionRunsMillis" value="60000"/>
    <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
    <property name="minEvictableIdleTimeMillis" value="300000"/>
    <property name="validationQuery" value="SELECT 'x'"/>
    <property name="testWhileIdle" value="true"/>
    <property name="testOnBorrow" value="false"/>
    <property name="testOnReturn" value="false"/>
    <!-- 打开PSCache,并且指定每个连接上PSCache的大小 -->
    <property name="poolPreparedStatements" value="true"/>
    <property name="maxPoolPreparedStatementPerConnectionSize" value="20"/>
    <property name="filters" value="config"/>
    <property name="connectionProperties" value="config.decrypt=true" />
    </bean>
    <bean id="dataSourceRead1" parent="abstractDataSource">
    <property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
    <!-- 基本属性 url、user、password -->
    <property name="url" value="${read1.jdbc.url}"/>
    <property name="username" value="${read1.jdbc.user}"/>
    <property name="password" value="${read1.jdbc.password}"/>
    <!-- 配置初始化大小、最小、最大 -->
    <property name="initialSize" value="${read1.jdbc.initPoolSize}"/>
    <property name="minIdle" value="${read1.jdbc.minPoolSize}"/>
    <property name="maxActive" value="${read1.jdbc.maxPoolSize}"/>
    </bean>
    <bean id="dataSourceRead2" parent="abstractDataSource">
    <property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
    <!-- 基本属性 url、user、password -->
    <property name="url" value="${read2.jdbc.url}"/>
    <property name="username" value="${read2.jdbc.user}"/>
    <property name="password" value="${read2.jdbc.password}"/>
    <!-- 配置初始化大小、最小、最大 -->
    <property name="initialSize" value="${read2.jdbc.initPoolSize}"/>
    <property name="minIdle" value="${read2.jdbc.minPoolSize}"/>
    <property name="maxActive" value="${read2.jdbc.maxPoolSize}"/>
    </bean>
    <bean id="dataSourceWrite" parent="abstractDataSource">
    <property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
    <!-- 基本属性 url、user、password -->
    <property name="url" value="${write.jdbc.url}"/>
    <property name="username" value="${write.jdbc.user}"/>
    <property name="password" value="${write.jdbc.password}"/>
    <!-- 配置初始化大小、最小、最大 -->
    <property name="initialSize" value="${write.jdbc.initPoolSize}"/>
    <property name="minIdle" value="${write.jdbc.minPoolSize}"/>
    <property name="maxActive" value="${write.jdbc.maxPoolSize}"/>
    </bean>
    <bean id="dataSource" class="io.dearas.datasource2.DynamicDataSource">
    <property name="writeDataSource" ref="dataSourceWrite" />
    <property name="readDataSources">
    <list>
    <ref bean="dataSourceRead1" />
    <ref bean="dataSourceRead2" />
    </list>
    </property>
    <!--轮询方式 1为轮询,其它为随机-->
    <property name="readDataSourcePollPattern" value="1" />
    <property name="defaultTargetDataSource" ref="dataSourceWrite"/>
    </bean>
    <tx:annotation-driven transaction-manager="transactionManager"/>
    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    <property name="dataSource" ref="dataSource"/>
    </bean>
    <!-- 针对myBatis的配置项 -->
    <!-- 配置sqlSessionFactory -->
    <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
    <!-- 实例化sqlSessionFactory时需要使用上述配置好的数据源以及SQL映射文件 -->
    <property name="dataSource" ref="dataSource"/>
    <property name="mapperLocations" value="classpath:mapper/*.xml"/>
    </bean>
    <!-- 配置扫描器 -->
    <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
    <!-- 扫描包以及它的子包下的所有映射接口类 -->
    <property name="basePackage" value="com.test.api.dao.inte"/>
    <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory"/>
    </bean>
    <!-- 配置数据库注解aop -->
    <bean id="dynamicDataSourceAspect" class="io.dearas.datasource2.DynamicDataSourceAspect" />
    <!--配置事务管理,设置所有的读库为read-only="true" 在数据库切换中通过read-only进行判断是否使用读库 -->
    <tx:advice id="txAdvice" transaction-manager="transactionManager">
    <tx:attributes>
    <tx:method name="save*" propagation="REQUIRED" />
    <tx:method name="add*" propagation="REQUIRED" />
    <tx:method name="create*" propagation="REQUIRED" />
    <tx:method name="insert*" propagation="REQUIRED" />
    <tx:method name="update*" propagation="REQUIRED" />
    <tx:method name="merge*" propagation="REQUIRED" />
    <tx:method name="del*" propagation="REQUIRED" />
    <tx:method name="remove*" propagation="REQUIRED" />
    <tx:method name="query*" read-only="true"/>
    <tx:method name="use*" read-only="true"/>
    <tx:method name="get*" read-only="true" />
    <tx:method name="count*" read-only="true" />
    <tx:method name="find*" read-only="true" />
    <tx:method name="list*" read-only="true" />
    <tx:method name="*" propagation="REQUIRED"/>
    </tx:attributes>
    </tx:advice>
    <aop:config expose-proxy="true">
    <!-- 只对业务逻辑层实施事务 -->
    <aop:pointcut id="txPointcut" expression="execution(* io.dearas.servie.*(..))" />
    <aop:advisor advice-ref="txAdvice" pointcut-ref="txPointcut"/>
    <!-- 通过AOP切面实现读/写库选择 order 用于定义与事务的切割顺序,order越小 越先被执行。保证在事务之前就已经进行了数据源的切换-->
    <aop:aspect id="c" order="-2147483648" ref="dynamicDataSourceAspect">
    <aop:before pointcut-ref="txPointcut" method="before"/>
    <aop:after pointcut-ref="txPointcut" method="after"/>
    </aop:aspect>
    </aop:config>
    </beans>
  • 动态数据源实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    package io.dearas.datasource2;
    import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ThreadLocalRandom;
    import java.util.concurrent.atomic.AtomicLong;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    /**
    * 动态数据源实现读写分离
    */
    public class DynamicDataSource extends AbstractRoutingDataSource {
    private Object writeDataSource; //写数据源
    private List<Object> readDataSources; //多个读数据源
    private int readDataSourceSize; //读数据源个数
    private int readDataSourcePollPattern = 0; //获取读数据源方式,0:随机,1:轮询
    private AtomicLong counter = new AtomicLong(0);
    private static final Long MAX_POOL = Long.MAX_VALUE;
    private final Lock lock = new ReentrantLock();
    @Override
    public void afterPropertiesSet() {
    if (this.writeDataSource == null) {
    throw new IllegalArgumentException("Property 'writeDataSource' is required");
    }
    setDefaultTargetDataSource(writeDataSource);
    Map<Object, Object> targetDataSources = new HashMap<>();
    targetDataSources.put(DynamicDataSourceGlobal.WRITE.name(), writeDataSource);
    if (this.readDataSources == null) {
    readDataSourceSize = 0;
    } else {
    for(int i=0; i<readDataSources.size(); i++) {
    targetDataSources.put(DynamicDataSourceGlobal.READ.name() + i, readDataSources.get(i));
    }
    readDataSourceSize = readDataSources.size();
    }
    setTargetDataSources(targetDataSources);
    super.afterPropertiesSet();
    }
    @Override
    protected Object determineCurrentLookupKey() {
    DynamicDataSourceGlobal dynamicDataSourceGlobal = DynamicDataSourceHolder.getDataSource();
    if(dynamicDataSourceGlobal == null
    || dynamicDataSourceGlobal == DynamicDataSourceGlobal.WRITE
    || readDataSourceSize <= 0) {
    return DynamicDataSourceGlobal.WRITE.name();
    }
    int index = 1;
    if(readDataSourcePollPattern == 1) {
    //轮询方式
    long currValue = counter.incrementAndGet();
    if((currValue + 1) >= MAX_POOL) {
    try {
    lock.lock();
    if((currValue + 1) >= MAX_POOL) {
    counter.set(0);
    }
    } finally {
    lock.unlock();
    }
    }
    index = (int) (currValue % readDataSourceSize);
    } else {
    //随机方式
    index = ThreadLocalRandom.current().nextInt(0, readDataSourceSize);
    }
    return dynamicDataSourceGlobal.name() + index;
    }
    public void setWriteDataSource(Object writeDataSource) {
    this.writeDataSource = writeDataSource;
    }
    public void setReadDataSources(List<Object> readDataSources) {
    this.readDataSources = readDataSources;
    }
    public void setReadDataSourcePollPattern(int readDataSourcePollPattern) {
    this.readDataSourcePollPattern = readDataSourcePollPattern;
    }
    }
  • spring AOP切面

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    package io.dearas.datasource2;
    import org.apache.log4j.Logger;
    import org.aspectj.lang.JoinPoint;
    import org.aspectj.lang.reflect.MethodSignature;
    import java.lang.reflect.Method;
    /**
    * 定义选择数据源切面
    */
    public class DynamicDataSourceAspect {
    private static final Logger logger = Logger.getLogger(DynamicDataSourceAspect.class);
    public void pointCut(){};
    public void before(JoinPoint point)
    {
    Object target = point.getTarget();
    String methodName = point.getSignature().getName();
    Class<?>[] clazz = target.getClass().getInterfaces();
    Class<?>[] parameterTypes = ((MethodSignature) point.getSignature()).getMethod().getParameterTypes();
    try {
    Method method = clazz[0].getMethod(methodName, parameterTypes);
    if (method != null && method.isAnnotationPresent(DataSource.class)) {
    DataSource data = method.getAnnotation(DataSource.class);
    DynamicDataSourceHolder.putDataSource(data.value());
    }
    } catch (Exception e) {
    logger.error(String.format("Choose DataSource error, method:%s, msg:%s", methodName, e.getMessage()));
    }
    }
    public void after(JoinPoint point) {
    DynamicDataSourceHolder.clearDataSource();
    }
    }
  • 一些其它的辅助类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    package io.dearas.datasource2;
    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.METHOD)
    public @interface DataSource {
    public DynamicDataSourceGlobal value() default DynamicDataSourceGlobal.READ;
    }
1
2
3
4
5
6
package io.dearas.datasource2;
//定义枚举类型,读写
public enum DynamicDataSourceGlobal {
READ, WRITE;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package io.dearas.datasource2;
public class DynamicDataSourceHolder {
private static final ThreadLocal<DynamicDataSourceGlobal> holder = new ThreadLocal<DynamicDataSourceGlobal>();
public static void putDataSource(DynamicDataSourceGlobal dataSource){
holder.set(dataSource);
}
public static DynamicDataSourceGlobal getDataSource(){
return holder.get();
}
public static void clearDataSource() {
holder.remove();
}
}

轮询算法的讲解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
if(readDataSourcePollPattern == 1) {
/**
* 为了保证线程安全,count是AtomicLong类型,是原子变量。
* incrementAndGet() 相当于 ++i 先+1后引用
* getAndIncrement() 相当于 i++ 先引用后+1
*/
long currValue = counter.incrementAndGet();
if((currValue + 1) >= MAX_POOL) {
try {
/**
* Lock lock = new ReentrantLock();
* 使用java lock机制
*/
lock.lock();
if((currValue + 1) >= MAX_POOL) { //再次判断
counter.set(0);
}
} finally {
lock.unlock();
}
}
index = (int) (currValue % readDataSourceSize);
} else {
// 随机方式
/**
* ThreadLocalRandom是JDK 7之后提供并发产生随机数,能够解决多个线程发生的竞争争夺。ThreadLocalRandom不是直接用new实例化,
* 而是第一次使用其静态方法current()。从Math.random()改变到ThreadLocalRandom有如下好处:
* 我们不再有从多个线程访问同一个随机数生成器实例的争夺。取代以前每个随机变量实例化一个随机数生成器实例,我们可以每个线程实例化一个。
*/
index = ThreadLocalRandom.current().nextInt(0, readDataSourceSize);
}

方式三

利用Spring的AOP实现数据源的动态切换。
除了可以利用如AbstractRoutingDataSource之外还可以利用AbstractDataSource来重写getConnection()方法,来实现数据源的读写分离和动态切换。根据事务的配置,如果不是readOnly方法则使用写库,如果是readOnly的方法,再进行后续的逻辑判断。
如配置之前是写操作,则本次也使用写库进行读取。

需要确定的两点:

  1. 根据事务的readOnly判断使用读库还是写库。
  2. 继承AbstarctDataSource重写getConnection()方法,通过返回不同的连接来进行数据源的切换。

此方式实现过程可能比较复杂,整体流程分为以下步骤:
1、spring容器加载完成之后,把依赖注入的map中的key添加到String[] readDataSourceNames这个数组中,代表读库的名称。map中的value添加到DataSource[] readDataSources这个数组中,代表读库的具体数据源。

2、spring容器中每个bean对象被初始化完成之后(因为bean被初始化完成之后,AOP切面已经切入进去,所以可以获取到切面中的内容):如果这个bean对象是有事务处理的则进行判断事务的类型来决定具体使用读库还是写库。
如果是读库则把方法名添加到map中,key为方法名,value为true 表示这个方法使用读库,如果是写库则使用false。
这个步骤最终把所有的方法名及使用的数据源类型都添加到了一个map中。当真正调用的时候根据方法名在map中查找,最后确定要使用的数据源。

3、当真正调用方法的时候AOP中的切面做的事情:根据当前的方法名判断要使用的是读库还是写库,进行一个标记。

4、因为确定数据源切入点要比事务的切入先执行(通过配置文件中的order属性进行控制),所以在步骤3执行完毕之后,开启事务等要获取数据库连接就会调用getConnection()方法。
因为我们又对getConnection()方法进行了重写,所以在重写getConnection()中使用具体的数据源连接池获取具体的connection。从而达到了数据库连接的切换。还可以在getConnection中用轮询算法进行多读库的使用。非常灵活。

  • spring-context.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jdbc="http://www.springframework.org/schema/jdbc"
    xmlns:mybatis-spring="http://mybatis.org/schema/mybatis-spring"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
    http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.2.xsd
    http://mybatis.org/schema/mybatis-spring http://mybatis.org/schema/mybatis-spring-1.2.xsd
    http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd">
    <context:property-placeholder
    location="classpath:dbconfig.properties" />
    <!--此配置为配置数据库读写分离 实现多读库-->
    <!-- 数据源 读 -->
    <bean id="readDataSource1" class="com.alibaba.druid.pool.DruidDataSource"
    init-method="init" destroy-method="close">
    <property name="url" value="${jdbc_url}" />
    <property name="username" value="${jdbc_username}" />
    <property name="password" value="${jdbc_password}" />
    <!-- 初始化连接大小 -->
    <property name="initialSize" value="2" />
    <!-- 连接池最大使用连接数量 -->
    <property name="maxActive" value="1000" />
    <!-- 连接池最小空闲 -->
    <property name="minIdle" value="2" />
    <!-- 获取连接最大等待时间 -->
    <property name="maxWait" value="60000" />
    <!-- <property name="poolPreparedStatements" value="true" /> <property
    name="maxPoolPreparedStatementPerConnectionSize" value="33" /> -->
    <property name="validationQuery" value="${validationQuery}" />
    <property name="testOnBorrow" value="false" />
    <property name="testOnReturn" value="false" />
    <property name="testWhileIdle" value="true" />
    <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
    <property name="timeBetweenEvictionRunsMillis" value="60000" />
    <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
    <property name="minEvictableIdleTimeMillis" value="25200000" />
    <!-- 打开removeAbandoned功能 -->
    <property name="removeAbandoned" value="true" />
    <!-- 1800秒,也就是30分钟 -->
    <property name="removeAbandonedTimeout" value="1800" />
    <!-- 关闭abanded连接时输出错误日志 -->
    <property name="logAbandoned" value="true" />
    <!-- 监控数据库 -->
    <!-- <property name="filters" value="stat" /> -->
    <property name="filters" value="mergeStat" />
    </bean>
    <!-- 数据源 读 -->
    <bean id="readDataSource2" class="com.alibaba.druid.pool.DruidDataSource"
    init-method="init" destroy-method="close">
    <property name="url" value="${jdbc_url}" />
    <property name="username" value="${jdbc_username}" />
    <property name="password" value="${jdbc_password}" />
    <!-- 初始化连接大小 -->
    <property name="initialSize" value="2" />
    <!-- 连接池最大使用连接数量 -->
    <property name="maxActive" value="1000" />
    <!-- 连接池最小空闲 -->
    <property name="minIdle" value="2" />
    <!-- 获取连接最大等待时间 -->
    <property name="maxWait" value="60000" />
    <!-- <property name="poolPreparedStatements" value="true" /> <property
    name="maxPoolPreparedStatementPerConnectionSize" value="33" /> -->
    <property name="validationQuery" value="${validationQuery}" />
    <property name="testOnBorrow" value="false" />
    <property name="testOnReturn" value="false" />
    <property name="testWhileIdle" value="true" />
    <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
    <property name="timeBetweenEvictionRunsMillis" value="60000" />
    <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
    <property name="minEvictableIdleTimeMillis" value="25200000" />
    <!-- 打开removeAbandoned功能 -->
    <property name="removeAbandoned" value="true" />
    <!-- 1800秒,也就是30分钟 -->
    <property name="removeAbandonedTimeout" value="1800" />
    <!-- 关闭abanded连接时输出错误日志 -->
    <property name="logAbandoned" value="true" />
    <!-- 监控数据库 -->
    <!-- <property name="filters" value="stat" /> -->
    <property name="filters" value="mergeStat" />
    </bean>
    <!-- 数据源 写 -->
    <bean id="writeDataSource" class="com.alibaba.druid.pool.DruidDataSource"
    init-method="init" destroy-method="close">
    <property name="url" value="${jdbc_url_writer}" />
    <property name="username" value="${jdbc_username_writer}" />
    <property name="password" value="${jdbc_password_writer}" />
    <!-- 初始化连接大小 -->
    <property name="initialSize" value="2" />
    <!-- 连接池最大使用连接数量 -->
    <property name="maxActive" value="1000" />
    <!-- 连接池最小空闲 -->
    <property name="minIdle" value="2" />
    <!-- 获取连接最大等待时间 -->
    <property name="maxWait" value="60000" />
    <!-- <property name="poolPreparedStatements" value="true" /> <property
    name="maxPoolPreparedStatementPerConnectionSize" value="33" /> -->
    <property name="validationQuery" value="${validationQuery}" />
    <property name="testOnBorrow" value="false" />
    <property name="testOnReturn" value="false" />
    <property name="testWhileIdle" value="true" />
    <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
    <property name="timeBetweenEvictionRunsMillis" value="60000" />
    <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
    <property name="minEvictableIdleTimeMillis" value="25200000" />
    <!-- 打开removeAbandoned功能 -->
    <property name="removeAbandoned" value="true" />
    <!-- 1800秒,也就是30分钟 -->
    <property name="removeAbandonedTimeout" value="1800" />
    <!-- 关闭abanded连接时输出错误日志 -->
    <property name="logAbandoned" value="true" />
    <!-- 监控数据库 -->
    <!-- <property name="filters" value="stat" /> -->
    <property name="filters" value="mergeStat" />
    </bean>
    <!--写库配置 在写库中指定读库的列表,然后从库列表默认使用轮询的方式来使用读库 -->
    <bean id="readWriteDataSource" class="io.dearas.datasource.ReadWriteDataSource">
    <property name="writeDataSource" ref="writeDataSource"/>
    <property name="readDataSourceMap">
    <map>
    <entry key="readDataSource1" value-ref="readDataSource1"/>
    <entry key="readDataSource2" value-ref="readDataSource2"/>
    </map>
    </property>
    <!--轮询方式 1为轮询,其它为随机-->
    <property name="readDataSourcePollPattern" value="1" />
    </bean>
    <!--配置事务管理,设置所有的读库为read-only="true" 在数据库切换中通过read-only进行判断是否使用读库 -->
    <tx:advice id="txAdvice" transaction-manager="txManager">
    <tx:attributes>
    <tx:method name="save*" propagation="REQUIRED" />
    <tx:method name="add*" propagation="REQUIRED" />
    <tx:method name="create*" propagation="REQUIRED" />
    <tx:method name="insert*" propagation="REQUIRED" />
    <tx:method name="update*" propagation="REQUIRED" />
    <tx:method name="merge*" propagation="REQUIRED" />
    <tx:method name="del*" propagation="REQUIRED" />
    <tx:method name="remove*" propagation="REQUIRED" />
    <tx:method name="query*" read-only="true"/>
    <tx:method name="use*" read-only="true"/>
    <tx:method name="get*" read-only="true" />
    <tx:method name="count*" read-only="true" />
    <tx:method name="find*" read-only="true" />
    <tx:method name="list*" read-only="true" />
    <tx:method name="*" propagation="REQUIRED"/>
    </tx:attributes>
    </tx:advice>
    <!--Spring事务管理器 -->
    <bean id="txManager"
    class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    <property name="dataSource" ref="readWriteDataSource" />
    </bean>
    <!--读/写 库动态分配管理者 -->
    <bean id="readWriteDataSourceTransactionProcessor" class="io.dearas.datasource.ReadWriteDataSourceProcessor">
    <property name="forceChoiceReadWhenWrite" value="false"/>
    </bean>
    <aop:config expose-proxy="true">
    <!-- 只对业务逻辑层实施事务 -->
    <aop:pointcut id="txPointcut" expression="execution(* io.dearas.*.*(..))" />
    <aop:advisor advice-ref="txAdvice" pointcut-ref="txPointcut"/>
    <!-- 通过AOP切面实现读/写库选择 order 用于定义与事务的切割顺序,order越小 越先被执行。保证在事务之前就已经进行了数据源的切换-->
    <aop:aspect order="-2147483648" ref="readWriteDataSourceTransactionProcessor">
    <aop:around pointcut-ref="txPointcut" method="determineReadOrWriteDB"/>
    </aop:aspect>
    </aop:config>
    <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
    <property name="dataSource" ref="readWriteDataSource" />
    </bean>
    </beans>
  • 动态数据源

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    package io.dearas.datasource;
    import java.sql.Connection;
    import java.sql.SQLException;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.concurrent.atomic.AtomicInteger;
    import javax.sql.DataSource;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.jdbc.datasource.AbstractDataSource;
    import org.springframework.util.CollectionUtils;
    /**
    *
    * 读/写动态选择数据库实现
    *
    * 默认按顺序轮询使用读库
    * 默认选择写库
    *
    * 一写多读、当写时默认读操作到写库、当写时强制读操作到读库
    *
    */
    public class ReadWriteDataSource extends AbstractDataSource implements InitializingBean {
    private static final Logger log = LoggerFactory.getLogger(ReadWriteDataSource.class);
    private DataSource writeDataSource; //DI写库
    private Map<String, DataSource> readDataSourceMap; //DI读库map
    //容器加载完毕之后初始化这两个数组方便以后使用
    private String[] readDataSourceNames;
    private DataSource[] readDataSources;
    private int readDataSourceCount; //读库的数量
    private int readDataSourcePollPattern = 0; //获取读数据源方式,0:随机,1:轮询
    public void setReadDataSourcePollPattern(int readDataSourcePollPattern) {
    this.readDataSourcePollPattern = readDataSourcePollPattern;
    }
    private AtomicInteger counter = new AtomicInteger(1);
    /**
    * 设置读库(name, DataSource)
    * @param readDataSourceMap
    */
    public void setReadDataSourceMap(Map<String, DataSource> readDataSourceMap) {
    this.readDataSourceMap = readDataSourceMap;
    }
    public void setWriteDataSource(DataSource writeDataSource) {
    this.writeDataSource = writeDataSource;
    }
    /**
    * 容器初始化之后:
    * 因为xml配置文件中只注入了readDataSourceMap这个Map和写库的数据源writeDataSource,这里把map中的元素分解。
    * map中的key添加到String[] readDataSourceNames这个数组中,代表读库的名称。
    * map中的value添加到DataSource[] readDataSources这个数组中,代表读库的具体数据源。
    * @throws Exception
    */
    @Override
    public void afterPropertiesSet() throws Exception {
    System.out.println("Spring容器加载完毕。");
    if(writeDataSource == null) {
    throw new IllegalArgumentException("property 'writeDataSource' is required");
    }
    if(CollectionUtils.isEmpty(readDataSourceMap)) {
    throw new IllegalArgumentException("property 'readDataSourceMap' is required");
    }
    readDataSourceCount = readDataSourceMap.size();
    readDataSources = new DataSource[readDataSourceCount];
    readDataSourceNames = new String[readDataSourceCount];
    int i = 0;
    for(Entry<String, DataSource> e : readDataSourceMap.entrySet()) {
    readDataSources[i] = e.getValue();
    readDataSourceNames[i] = e.getKey();
    i++;
    }
    }
    private DataSource determineDataSource() {
    if(ReadWriteDataSourceDecision.isChoiceWrite()) {
    log.debug("current determine write datasource");
    return writeDataSource;
    }
    if(ReadWriteDataSourceDecision.isChoiceNone()) {
    log.debug("no choice read/write, default determine write datasource");
    return writeDataSource;
    }
    return determineReadDataSource();
    }
    private DataSource determineReadDataSource() {
    //按照顺序选择读库
    //TODO 算法改进
    int index;
    if(readDataSourcePollPattern == 1){
    index = counter.incrementAndGet() % readDataSourceCount;
    if(index < 0) {
    index = - index;
    }
    }else{
    // 随机方式
    /**
    * ThreadLocalRandom是JDK 7之后提供并发产生随机数,能够解决多个线程发生的竞争争夺。ThreadLocalRandom不是直接用new实例化,
    * 而是第一次使用其静态方法current()。从Math.random()改变到ThreadLocalRandom有如下好处:
    * 我们不再有从多个线程访问同一个随机数生成器实例的争夺。取代以前每个随机变量实例化一个随机数生成器实例,我们可以每个线程实例化一个。
    */
    index = ThreadLocalRandom.current().nextInt(0, readDataSourceCount);
    }
    String dataSourceName = readDataSourceNames[index];
    log.debug("current determine read datasource : {} and index is : {}", dataSourceName,index);
    return readDataSources[index];
    }
    @Override
    public Connection getConnection() throws SQLException {
    return determineDataSource().getConnection();
    }
    @Override
    public Connection getConnection(String username, String password) throws SQLException {
    return determineDataSource().getConnection(username, password);
    }
    }
  • Spring AOP切面

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    package io.dearas.datasource;
    import java.lang.reflect.Field;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Map.Entry;
    import org.aspectj.lang.ProceedingJoinPoint;
    import org.aspectj.lang.annotation.Around;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.BeansException;
    import org.springframework.beans.factory.config.BeanPostProcessor;
    import org.springframework.core.NestedRuntimeException;
    import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
    import org.springframework.transaction.annotation.Propagation;
    import org.springframework.transaction.interceptor.NameMatchTransactionAttributeSource;
    import org.springframework.transaction.interceptor.RuleBasedTransactionAttribute;
    import org.springframework.transaction.interceptor.TransactionAttribute;
    import org.springframework.transaction.interceptor.TransactionInterceptor;
    import org.springframework.util.PatternMatchUtils;
    import org.springframework.util.ReflectionUtils;
    /**
    * 这个类有两个功能:
    * 1.实现BeanPostProcessor接口,实例化每个bean对象之后,把有事务控制的bean对象摘取出来,并对它的方法进行判断,最终添加到一个map中。map的key为方法名,value为是否使用读库
    * 2.determineReadOrWriteDB方法作为AOP的切面,对使用读库还是写库进行标记。
    * 扩展点:
    * 1.通过<property name="forceChoiceReadWhenWrite" value="false"/> forceChoiceReadWhenWrite来配置在前一个操作时写操作时,下一个操作使用读库还是写库。(为了防止主从复制不及时现象)
    * 2.通过 <tx:method name="query*" read-only="true"/> 事务的read-only属性来判断是使用读库还是写库。
    * 这些判断操作都是在postProcessAfterInitialization()方法中完成的。
    */
    public class ReadWriteDataSourceProcessor implements BeanPostProcessor {
    private static final Logger log = LoggerFactory.getLogger(ReadWriteDataSourceProcessor.class);
    private boolean forceChoiceReadWhenWrite = false;
    private Map<String, Boolean> readMethodMap = new HashMap<String, Boolean>();
    /**
    * 当之前操作是写的时候,是否强制从从库读
    * 默认(false) 当之前操作是写,默认强制从写库读
    * @param forceReadOnWrite
    */
    public void setForceChoiceReadWhenWrite(boolean forceChoiceReadWhenWrite) {
    this.forceChoiceReadWhenWrite = forceChoiceReadWhenWrite;
    }
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    System.out.println("AfterInitialization,实例化之后的动作。"+beanName);
    //1.如果这个bean不是NameMatchTransactionAttributeSource类型,则跳过。表示是有事务的bean对象
    if(!(bean instanceof NameMatchTransactionAttributeSource)) {
    return bean;
    }
    try {
    //2. 获取事务的相关属性进行判断是否添加到readMethodMap中,添加到根据map的value是false和true判断是不是读库。
    NameMatchTransactionAttributeSource transactionAttributeSource = (NameMatchTransactionAttributeSource)bean;
    Field nameMapField = ReflectionUtils.findField(NameMatchTransactionAttributeSource.class, "nameMap");
    nameMapField.setAccessible(true);
    Map<String, TransactionAttribute> nameMap = (Map<String, TransactionAttribute>) nameMapField.get(transactionAttributeSource);
    for(Entry<String, TransactionAttribute> entry : nameMap.entrySet()) {
    RuleBasedTransactionAttribute attr = (RuleBasedTransactionAttribute)entry.getValue();
    //仅对read-only的处理
    if(!attr.isReadOnly()) {
    continue;
    }
    String methodName = entry.getKey();
    Boolean isForceChoiceRead = Boolean.FALSE;
    if(forceChoiceReadWhenWrite) {
    //不管之前操作是写,默认强制从读库读 (设置为NOT_SUPPORTED即可)
    //NOT_SUPPORTED会挂起之前的事务
    attr.setPropagationBehavior(Propagation.NOT_SUPPORTED.value());
    isForceChoiceRead = Boolean.TRUE;
    } else {
    //否则 设置为SUPPORTS(这样可以参与到写事务)
    attr.setPropagationBehavior(Propagation.SUPPORTS.value());
    }
    log.debug("read/write transaction process method:{} force read:{}", methodName, isForceChoiceRead);
    readMethodMap.put(methodName, isForceChoiceRead);
    }
    } catch (Exception e) {
    throw new ReadWriteDataSourceTransactionException("process read/write transaction error", e);
    }
    return bean;
    }
    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
    System.out.println("BeforeInitialization,实例化之前的动作。"+beanName);
    return bean;
    }
    private class ReadWriteDataSourceTransactionException extends NestedRuntimeException {
    public ReadWriteDataSourceTransactionException(String message, Throwable cause) {
    super(message, cause);
    }
    }
    /**
    * 数据源切面
    * @param pjp
    * @return
    * @throws Throwable
    */
    public Object determineReadOrWriteDB(ProceedingJoinPoint pjp) throws Throwable {
    //pjp.getSignature().getName() 返回的是切入点的方法名
    if (isChoiceReadDB(pjp.getSignature().getName())) {
    ReadWriteDataSourceDecision.markRead(); //标记此方法为读库
    } else {
    ReadWriteDataSourceDecision.markWrite(); //标记此方法为写库
    }
    try {
    return pjp.proceed();
    } finally {
    ReadWriteDataSourceDecision.reset();
    }
    }
    /***
    * 判断是不是读库
    * @param methodName
    * @return
    */
    private boolean isChoiceReadDB(String methodName) {
    String bestNameMatch = null;
    for (String mappedName : this.readMethodMap.keySet()) {
    if (isMatch(methodName, mappedName)) {
    bestNameMatch = mappedName;
    break;
    }
    }
    Boolean isForceChoiceRead = readMethodMap.get(bestNameMatch);
    //表示强制选择 读 库
    if(isForceChoiceRead == Boolean.TRUE) {
    return true;
    }
    //如果之前选择了写库 现在还选择 写库
    if(ReadWriteDataSourceDecision.isChoiceWrite()) {
    return false;
    }
    //表示应该选择读库
    if(isForceChoiceRead != null) {
    return true;
    }
    //默认选择 写库
    return false;
    }
    protected boolean isMatch(String methodName, String mappedName) {
    return PatternMatchUtils.simpleMatch(mappedName, methodName);
    }
    }
  • 辅助类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    package io.dearas.datasource;
    /**
    * 读/写动态数据库 决策者
    * 根据DataSourceType是write/read 来决定是使用读/写数据库
    * 通过ThreadLocal绑定实现选择功能
    *
    */
    public class ReadWriteDataSourceDecision {
    public enum DataSourceType {
    write, read;
    }
    private static final ThreadLocal<DataSourceType> holder = new ThreadLocal<DataSourceType>();
    public static void markWrite() {
    holder.set(DataSourceType.write);
    }
    public static void markRead() {
    holder.set(DataSourceType.read);
    }
    public static void reset() {
    holder.set(null);
    }
    public static boolean isChoiceNone() {
    return null == holder.get();
    }
    public static boolean isChoiceWrite() {
    return DataSourceType.write == holder.get();
    }
    public static boolean isChoiceRead() {
    return DataSourceType.read == holder.get();
    }
    }

可以在方式三中同样加入轮询算法的判断,使用不同的算法实现多读库,灵活多变。