博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spring主从数据源动态切换
阅读量:4705 次
发布时间:2019-06-10

本文共 9271 字,大约阅读时间需要 30 分钟。

参考文档:
 
我们的需求达到的目标和现有的条件:
 
  • 不同类型数据源都可能存在master和slave区分;
  • 数据源之间已经可以通过package区分,不同package对应的service也不同;
  • aop在service层面,对应不同数据源的service之间可能存在互相调用;
  • 最外层方法的名称决定了该数据源应该使用master(可写)还是slave数据源(不可写);
  • 在嵌套使用其他service的过程中,根据情况分析该service方法是否使用slave数据源;
 
我们在spring中的配置文件中使用了切面式的配置来定义声明式事务:
 
 
 
在aop:config中,只对业务逻辑层实施事务管理,此时需要定义pointCut:用于确定实行动态织入用到的方法条件,和advisor:用于确定方法中使用到的事务管理器,事务管理器中定义了各种类型方法前缀所定义的事务范围和传播属性。
 
我们的数据源需要动态定义,需要在事务开启之前,切入数据源去选择该方法执行过程到底是该使用读库还是写库来开启该事务,因此这个切面需要在org.springframework.transaction.interceptor.TransactionInterceptor之前就要起作用。
 
 

实现方案

 
完成的整体类图如下:
 
 
 
从AbstractDataSource中继承,增加写库以及多个从库,以便于在spring配置文件中能够配置该数据源,由于需要支持多个数据源的master/slave主从库配置,所以MasterSlaveDataSourceDecision中不能简单地定义静态ThreadLocal变量来维持当前事务状态,而且每个TransactionManager都需要定义对应的数据源决策类。
 
public class MasterSlaveDataSource extends AbstractDataSource implements InitializingBean {    private static final Logger log = LoggerFactory.getLogger(MasterSlaveDataSource.class);    private DataSource masterDataSource;    private Map
slaveDataSourceMap;
 
 
重写其中的getConnection()方法:
 
 
@Override    public Connection getConnection() throws SQLException {        return determineDataSource().getConnection();    }    @Override    public Connection getConnection(String username, String password) throws SQLException {        return determineDataSource().getConnection(username, password);    }
 
通过determineDataSource()方法来决定使用写库还是从库,对于多个从库来说,可以采用其他算法来支持,也可以根据线程ID,让同一个线程能够使用同一从库(当前实现并没有这么做):
 
public DataSource determineDataSource() {        if (masterSlaveDataSourceDecision.isChoiceWrite()) {            log.debug("current determine write datasource");            return masterDataSource;        } else if (masterSlaveDataSourceDecision.isChoiceNone()) {            log.debug("no choice read/write, default determine write datasource");            return masterDataSource;        } else {            return selectReadDataSource();        }    }
 
 
定义切换数据源使用到的切面方法,当进行到需要启动事务的方法时,根据需要选择。我们一般会定义一个txAdvice,用于声明式事务的传播属性以及readonly属性,如果我们需要使用到该属性,需要利用spring的Bean加载完成通知,实现BeanPostProcessor接口中的postProcessAfterInitialization方法
 
public class MasterSlaveDataSourceProcessor implements BeanPostProcessor {    private Map
readWriteMethodMap = new HashMap
();private String txAdviceName; private MasterSlaveDataSourceDecision masterSlaveDataSourceDecision; public void setTxAdviceName(String txAdviceName) { this.txAdviceName = txAdviceName; } public void setMasterSlaveDataSourceDecision(MasterSlaveDataSourceDecision masterSlaveDataSourceDecision) { this.masterSlaveDataSourceDecision = masterSlaveDataSourceDecision; }@Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if (txAdviceName.equalsIgnoreCase(beanName)) { try { TransactionInterceptor transactionInterceptor = (TransactionInterceptor) bean; NameMatchTransactionAttributeSource transactionAttributeSource = (NameMatchTransactionAttributeSource) transactionInterceptor.getTransactionAttributeSource(); Field nameMapField = ReflectionUtils.findField(NameMatchTransactionAttributeSource.class, "nameMap"); nameMapField.setAccessible(true); Map
nameMap = (Map
) nameMapField .get(transactionAttributeSource); for (Entry
entry : nameMap.entrySet()) { RuleBasedTransactionAttribute attr = (RuleBasedTransactionAttribute) entry.getValue(); // 仅对read-only的处理 String methodName = entry.getKey(); if (attr.isReadOnly()) { if (forceChoiceReadWhenWrite) { // 不管之前操作是写,默认强制从读库读 (设置为NOT_SUPPORTED即可) // NOT_SUPPORTED会挂起之前的事务 attr.setPropagationBehavior(Propagation.NOT_SUPPORTED.value()); } else { // 否则 设置为SUPPORTS(这样可以参与到写事务) attr.setPropagationBehavior(Propagation.SUPPORTS.value()); } } log.info("read/write transaction process method:{} force read:{}", methodName, forceChoiceReadWhenWrite); readWriteMethodMap.put(methodName, attr.isReadOnly()); } } catch (Exception e) { throw new ReadWriteDataSourceTransactionException("process read/write transaction error", e); } } return bean; }
 
 
由于我们的环境中允许存在多个数据源的主从库设置,也存在多个事务管理器,当然也会有多个txAdvice,这里在其中设置一个属性txAdvice名称,每个不同数据源的MasterSlaveDataSourceProcessor监听不同的txAdvice。
 
我们将其中的tx method属性分成两种类型,一种为只读(从库),一种为写(写库),将其放置到对应的readWriteMethodMap中,
 
 
 
根据切面的方法名称,以及刚才获得的readWriteMethodMap,来确定该方法是否可以读从库来减轻压力:
 
    
public Object selectDataSource(ProceedingJoinPoint pjp) throws Throwable {        if (isChoiceReadDB(pjp.getSignature().getName())) {            masterSlaveDataSourceDecision.markRead();        } else {            masterSlaveDataSourceDecision.markWrite();        }        try {            return pjp.proceed();        } catch (Throwable t) {            masterSlaveDataSourceDecision.reset();            throw t;        } finally {            masterSlaveDataSourceDecision.pop();        }    }
 
spring可以通过内置的PatternMatchUtils工具类,来进行简单匹配工作,实现最长路径匹配,找到最合适的matchName(该代码是从NameMatchTransactionAttributeSource.getTransactionAttribute()中获得)。
 
private boolean isChoiceReadDB(String methodName) {        String bestNameMatch = null;        for (String mappedName : this.readWriteMethodMap.keySet()) {            if (PatternMatchUtils.simpleMatch(mappedName, methodName)&&                    (bestNameMatch == null || bestNameMatch.length() <= mappedName.length())) {                bestNameMatch = mappedName;            }        }        // 默认走写库        boolean currentRead = (bestNameMatch == null ? false : readWriteMethodMap.get(bestNameMatch));         // 如果当前为读库,并且设置了强制读,则忽略当前主库写状态        if (currentRead && forceChoiceReadWhenWrite) {            return true;        }        // 如果之前选择了写库,则当前使用写库        if (masterSlaveDataSourceDecision.isChoiceWrite()) {            return false;        }        return currentRead;    }
 
 
由于需要支持service方法之间的嵌套操作,MasterSlaveDataSourceDecision需要使用ThreadLocal<Stack>的方法保存当前上下文对应的数据源配置:
 
public class MasterSlaveDataSourceDecision {    public enum DataSourceType {        write, read;    }    private final ThreadLocal
> holder = new ThreadLocal
>() { @Override protected Stack
initialValue() { return new Stack<>(); } }; public void markWrite() { holder.get().push(DataSourceType.write); } public void markRead() { holder.get().push(DataSourceType.read); } public void reset() { holder.get().clear(); } public boolean isChoiceNone() { return holder.get().isEmpty(); } public boolean isChoiceWrite() { return !isChoiceNone() && DataSourceType.write == holder.get().peek(); } public boolean isChoiceRead() { return !isChoiceNone() && DataSourceType.read == holder.get().peek(); } public DataSourceType pop() { return isChoiceNone() ? null : holder.get().pop(); }}
 
 

配置方法

 
数据源定义应该使用我们定义的主从DataSource,其中包含了写库以及从库的相关配置:
 
 
 
定义transactionManager使用该数据源:
 
 
定义txAdvice,关联该transactionManager,并定义事务的传播属性,以及readOnly属性
 
 
 
定义对应的aop config,使用aop:aspect,注意要设置order,保证该interceptor在事务启动之前能够选择到对应的dataSource,
 
 
 
加入Processor,以及decision,注意DataSourceProcessor需要关联对应的decision类,以及txAdvice名称(其id名称),以保证Processor会在对应的txAdvice加载完成后使用其定义的txAttributes属性信息,用于判断该事务的方法是否读主库,或从库。
 
 
 
此外com.api.example.tx.MasterSlaveDataSourceProcessor中forceChoiceReadWhenWrite属性用于控制该执行该方法时,是否需要强制读操作(如果存在嵌套事务,将当前事务挂起)。
 
经过测试,可以满足我们的需求,达到根据service包中方法名称动态切换主从数据源的目的。
 
 
 
 
 
 

转载于:https://www.cnblogs.com/mmaa/p/5789861.html

你可能感兴趣的文章
Linux IO模式及 select、poll、epoll详解
查看>>
Log4j知识汇总
查看>>
python生成.exe文件
查看>>
PHP面向对象(OOP)----分页类
查看>>
监听SD卡状态
查看>>
vs2017 EFCore 迁移数据库命令
查看>>
serialVersionUID的作用
查看>>
liunx trac 插件使用之GanttCalendarPlugin
查看>>
(14)嵌入式软件开发工程师技能要求总结
查看>>
[hackerrank]Closest Number
查看>>
volatile关键字
查看>>
[Android] TabLayout设置下划线(Indicator)宽度
查看>>
<潭州教育>-Python学习笔记@条件与循环
查看>>
web自动化之验证码识别解决方案
查看>>
netty接收大文件的方法
查看>>
软件工程设计之四则运算
查看>>
SpringMVC @ResponseBody 406
查看>>
Partial Tree UVALive - 7190(完全背包)
查看>>
Ubuntu安装搜狗拼音教程
查看>>
Happy Number
查看>>