springboot对于数据源的动态管理实践
用户通过界面配置数据源,可能包括oracle、mysql、mongodb等等;配置完数据源后,支持对于具体数据源进行实时sql查询操作。
方案
AbstractRoutingDataSource
原理
spring框架提供的AbstractRoutingDataSource类,能够帮助我门实现动态的添加数据源。所以这边我们就先一起来看下这个接口的内部实现。
- 首先我们会发现这是一个抽象类,然后我们看一下它的依赖关系。可以看到两个关键,一个是它实现了DataSource接口,这个接口我们很熟悉,就是进行sql连接的核心类。另外一个是它实现了InitializingBean接口,这个接口的作用是进行初始化。

- 我们看下这个类的注释,好的注释基本能够说明很多事情。


targetDataSources是一个map,存储所有的目标数据源,如果为空,则抛出异常。
resolvedDataSources表示一个经过解析后的所有数据源,看命名应该也能看出来。
最后把默认数据源也解析好存储起来。
- 为何做以上处理,resolveSpecifiedDataSource()这个解析方法。它这里做了一个判断,如果targetDataSources的value是string则会从Jndi数据源查找。

结论
通过上述分析,我们知道一点,就是这个AbstractRoutingDataSource类必须有默认的数据源。否则初始化阶段就会报错。然后我们也发现它内部的targetDataSources其实是一个map,存储的就是lookup key 以及对应的DataSource对象。很自然的,我们可以想到,其实 所谓的动态数据源切换,其实就是 其内部 缓存了所有目标数据源,并且通过对应的key能够找到对应的数据源。
动态数据源切换原理
- AbstractRoutingDataSource是个抽象类,它是需要子类为其实现具体的方法的,所以我们就去看下它的抽象方法。可以看到,这个类就一个抽象方法:determineCurrentLookupKey()

- 我们看下 调用这个抽象方法的地方,继续找线索。发现只有这个determineTargetDataSource()进行了调用。我们一起过下这个方法。方法的实现依然不复杂,就是获取对应的lookupkey,然后根据key去resolvedDataSource里获取解析的数据源对象,返回。

动态数据源的切换,就是子类实现具体lookupKey的实现,然后就可以切换到对应的数据源对象了。这个AbstractRoutingDataSource,其实就是做的这个事情。
AbstractRoutingDataSource 实践
小场景
对于一个新的数据源,需要实现动态添加
对于一个已经添加过的数据源,需要实现动态切换
对于一个长时间不使用的数据源,需要动态删除
数据源初始化Bean
- 前面我们在分析AbstractRoutingDataSource类的时候,有提到,必须设置一个默认数据源。这里我们利用spring boot的配置文件进行默认数据源的配置。

- 然后通过Configuration类来进行配置,具体代码如下。利用@Bean进行动态数据源类的注册。在配置的核心实现里,需要将默认的配置数据源注册上去。其中lookupKey这里随便取了个默认名,只要保证后面动态添加的数据源与其不重复就ok了。
@Configuration public class DataSourceConfigurer{ private static final Logger LOGGER = LoggerFactory.getLogger(DataSourceConfigurer.class);
@Value("${spring.datasource.url}") private String url; @Value("${spring.datasource.username}") private String username; @Value("${spring.datasource.password}") private String password; @Value("${spring.datasource.driverClassName}") private String driverClassName;
public Map<String, Object> getProperties() { Map<String, Object> map = new HashMap<>(); map.put("driverClassName", driverClassName); map.put("url", url); map.put("username", username); map.put("password", password); return map; }
public DataSource dataSource() { DataSource dataSource = null; try { dataSource = DruidDataSourceFactory.createDataSource(getProperties()); } catch (Exception e) { LOGGER.error("Create DataSource Error : {}", e); throw new RuntimeException(); } return dataSource; }
@Bean("dynamicDataSource") public DynamicRoutingDataSource dynamicDataSource() { DynamicRoutingDataSource dynamicRoutingDataSource = new DynamicRoutingDataSource(); Map<Object, Object> dataSourceMap = new HashMap<>(1); dataSourceMap.put("default_db", dataSource()); dynamicRoutingDataSource.setDefaultTargetDataSource(dataSource()); dynamicRoutingDataSource.setTargetDataSources(dataSourceMap); return dynamicRoutingDataSource; } }
|
数据源的动态添加&动态切换
- 这里的主要做法就是继承AbstractRoutingDataSource类。
- 首先我们需要需要实现determineCurrentLookupKey()方法。前面的分析,我们已经知道该方法是获取目标数据源的lookupKey。所以这里涉及到lookupKey在上下文的存储。
- 这里我们使用ThreadLocal来实现lookupKey的管理。ThreadLocal适用于每个线程需要自己独立的实例且该实例需要在多个方法中被使用,也即变量在线程间隔离而在方法或类间共享的场景。所以很适合我们这里的场景。下面是具体的实现代码。

回到最原始的需求,结合目前获取的信息,我们可以整理下动态数据源查询的具体步骤:
- 根据数据源配置信息,判断目标数据源是否已经存在,存在执行2,不存在执行3.
- 根据配置信息,生成lookupKey,存储在上下文环境。执行4
- 执行数据源添加操作。通过生成lookupKey,存储在上下文环境。执行4
- 进入AbstractRoutingDataSource的具体实现类,获取lookupKey,根据lookupKey获取目标数据源。
以上步骤的核心,我们通过代码一步步演示。首先通过如下的代码,实现目标数据源的判断操作。这个判断也可以通过切面来实现,这边只是简单的集成在业务实现内部。
DynamicDataSourceContextHolder.setDataSourceKey(sqlInfoVO.getProjectId()); if(!DynamicRoutingDataSource.isExistDataSource(sqlInfoVO.getProjectId())) { dynamicDataSource.addDataSource(sqlInfoVO); }
|
添加数据源的方法,主要就是先构建Connection连接进行测试,然后添加进AbstractRoutingDataSource的目标数据源map里。切记,添加完毕后,要进行afterPropertiesSet()操作,相当于刷新操作。
public synchronized boolean addDataSource(SqlInfoVO sqlInfoVO) { try { Connection connection = null; try { Class.forName(sqlInfoVO.getDriverClassName()); connection = DriverManager.getConnection( sqlInfoVO.getUrl(), sqlInfoVO.getUsername(), sqlInfoVO.getPassword()); } catch (Exception e) { e.printStackTrace(); return false; } finally { if (connection != null && !connection.isClosed()) { connection.close(); } } String projectId = sqlInfoVO.getProjectId(); if (StringUtils.isBlank(projectId)) { return false; } if (DynamicRoutingDataSource.isExistDataSource(projectId)) { return true; } DruidDataSource druidDataSource = (DruidDataSource) DruidDataSourceFactory.createDataSource(beanToMap(sqlInfoVO)); druidDataSource.init(); Map<Object, Object> targetMap = DynamicRoutingDataSource.targetTargetDataSources; targetMap.put(projectId, druidDataSource); this.setTargetDataSources(targetMap); this.afterPropertiesSet(); logger.info("dataSource [{}] has been added" + projectId); } catch (Exception e) { logger.error(e.getMessage()); return false; } return true; }
|
关于获取lookupKey的实现,应该很容易了,就是通过ThreadLocal去上下文获取。这里看到进行了updateTimer()操作,这个操作是为了数据源的连接过期断开而设定的。
String lookupKey = DynamicDataSourceContextHolder.getDataSourceKey(); updateTimer(lookupKey); return lookupKey;
|
利用定时任务管理数据源的过期
根据以上的实现,其实我们已经实现了数据源的动态添加和切换操作。然而我们还需要考虑下,对于部分很少甚至只连接一次的数据源,其实是没必要一直缓存的。所以我们需要实现一个过期检测的操作。
具体的功能,这里实现的思路是:
- 在设置目标数据源的Map时,通过copy存储一份带时间戳的数据源对象。
- 每次数据源查询时,都会去更新下对应数据源的时间戳。
- 设置一个过期时间,然后利用一个定时任务,来判断数据源是否过期。如果过期,则进行移除。
这里面核心的操作涉及到三块,首先是需要构建一个带有时间戳的数据源扩展类:
public class DynamicDataSourceTimer {
private static long idlePeriodTime = 10 * 60 * 1000;
private DataSource dds;
private long lastUseTime;
public DynamicDataSourceTimer(DataSource dds) { this.dds = dds; this.lastUseTime = System.currentTimeMillis(); }
public void refreshTime() { lastUseTime = System.currentTimeMillis(); }
public boolean checkAndClose() {
if (System.currentTimeMillis() - lastUseTime > idlePeriodTime) { return true; }
return false; }
}
|
构建一个定时任务,这里我们利用spring的@Schedula注解实现:
@Scheduled(initialDelay= 10 * 60 * 1000, fixedRate= 10 * 60 * 1000) public void clearTask() { clearIdleDDS(); }
private void clearIdleDDS() { timerMap.forEach((k,v) -> { if(v.checkAndClose()) { delDatasources(k.toString()); } });
|
实现一个移除数据源的方法,其核心依然在于更新AbstractRoutingDataSource的AbstractRoutingDataSource:
public synchronized boolean delDatasources(String datasourceid) { Map<Object, Object> dynamicTargetDataSources2 = DynamicRoutingDataSource.targetTargetDataSources; if (dynamicTargetDataSources2.containsKey(datasourceid)) { Set<DruidDataSource> druidDataSourceInstances = DruidDataSourceStatManager.getDruidDataSourceInstances(); for (DruidDataSource l : druidDataSourceInstances) { if (datasourceid.equals(l.getName())) { System.out.println(l); dynamicTargetDataSources2.remove(datasourceid); DruidDataSourceStatManager.removeDataSource(l); setTargetDataSources(dynamicTargetDataSources2); super.afterPropertiesSet(); return true; } } return false; } else { return false; } }
|
小需求源码
spring.datasource.url=jdbc:mysql: &allowMultiQueries=true&useSSL=false&serverTimezone=Asia/Shanghai\ &useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false spring.datasource.username=root spring.datasource.password=root # 数据库连接池配置 spring.datasource.driverClassName=com.mysql.jdbc.Driver
|
package com.trey.dynamicdatasource.datasource;
import com.alibaba.druid.pool.DruidDataSourceFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource; import java.util.HashMap; import java.util.Map;
@Configuration public class DataSourceConfigurer{ private static final Logger LOGGER = LoggerFactory.getLogger(DataSourceConfigurer.class);
@Value("${spring.datasource.url}") private String url; @Value("${spring.datasource.username}") private String username; @Value("${spring.datasource.password}") private String password; @Value("${spring.datasource.driverClassName}") private String driverClassName;
public Map<String, Object> getProperties() { Map<String, Object> map = new HashMap<>(); map.put("driverClassName", driverClassName); map.put("url", url); map.put("username", username); map.put("password", password); return map; }
public DataSource dataSource() { DataSource dataSource = null; try { dataSource = DruidDataSourceFactory.createDataSource(getProperties()); } catch (Exception e) { LOGGER.error("Create DataSource Error : {}", e); throw new RuntimeException(); } return dataSource; }
@Bean("dynamicDataSource") public DynamicRoutingDataSource dynamicDataSource() { DynamicRoutingDataSource dynamicRoutingDataSource = new DynamicRoutingDataSource(); Map<Object, Object> dataSourceMap = new HashMap<>(1); dataSourceMap.put("default_db", dataSource()); dynamicRoutingDataSource.setDefaultTargetDataSource(dataSource()); dynamicRoutingDataSource.setTargetDataSources(dataSourceMap); return dynamicRoutingDataSource; } }
|
- DynamicDataSourceContextHolder
package com.trey.dynamicdatasource.datasource;
public class DynamicDataSourceContextHolder {
private static final ThreadLocal<String> dbProjectContextHolder = new ThreadLocal<>();
public static void setDataSourceKey(String key) { dbProjectContextHolder.set(key); }
public static String getDataSourceKey() { return dbProjectContextHolder.get(); }
public static void clearDataSourceKey() { dbProjectContextHolder.remove(); }
}
|
package com.trey.dynamicdatasource.datasource;
import javax.sql.DataSource;
public class DynamicDataSourceTimer {
private static long idlePeriodTime = 10 * 60 * 1000;
private DataSource dds;
private long lastUseTime;
public DynamicDataSourceTimer(DataSource dds) { this.dds = dds; this.lastUseTime = System.currentTimeMillis(); }
public void refreshTime() { lastUseTime = System.currentTimeMillis(); }
public boolean checkAndClose() {
if (System.currentTimeMillis() - lastUseTime > idlePeriodTime) { return true; }
return false; }
}
|
package com.trey.dynamicdatasource.datasource;
import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.pool.DruidDataSourceFactory; import com.alibaba.druid.stat.DruidDataSourceStatManager; import com.trey.dynamicdatasource.pojo.SqlInfoVO; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.cglib.beans.BeanMap; import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; import org.springframework.scheduling.annotation.Scheduled;
import javax.sql.DataSource; import java.sql.Connection; import java.sql.DriverManager; import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap;
@Slf4j public class DynamicRoutingDataSource extends AbstractRoutingDataSource {
private static Map<Object, Object> targetTargetDataSources = new ConcurrentHashMap<>(); private static Map<Object, DynamicDataSourceTimer> timerMap = new ConcurrentHashMap<>();
@Scheduled(initialDelay= 10 * 60 * 1000, fixedRate= 10 * 60 * 1000) public void clearTask() { clearIdleDDS(); }
private void clearIdleDDS() { timerMap.forEach((k,v) -> { if(v.checkAndClose()) { delDatasources(k.toString()); } }); }
private void updateTimer(String lookupKey) { DynamicDataSourceTimer timer = timerMap.get(lookupKey); if(timer == null) { return ; } timer.refreshTime(); }
@Override protected Object determineCurrentLookupKey() { String lookupKey = DynamicDataSourceContextHolder.getDataSourceKey(); updateTimer(lookupKey); return lookupKey; }
@Override public void setTargetDataSources(Map<Object, Object> targetDataSources) { super.setTargetDataSources(targetDataSources); DynamicRoutingDataSource.targetTargetDataSources = targetDataSources; targetDataSources.forEach((k,v) -> { DataSource dataSource = (DataSource)v; timerMap.put(k,new DynamicDataSourceTimer(dataSource)); }); }
public static boolean isExistDataSource(String key) { return targetTargetDataSources.containsKey(key); }
public synchronized boolean addDataSource(SqlInfoVO sqlInfoVO) { try { Connection connection = null; try { Class.forName(sqlInfoVO.getDriverClassName()); connection = DriverManager.getConnection( sqlInfoVO.getUrl(), sqlInfoVO.getUsername(), sqlInfoVO.getPassword()); } catch (Exception e) { e.printStackTrace(); return false; } finally { if (connection != null && !connection.isClosed()) { connection.close(); } } String projectId = sqlInfoVO.getProjectId(); if (StringUtils.isBlank(projectId)) { return false; } if (DynamicRoutingDataSource.isExistDataSource(projectId)) { return true; } DruidDataSource druidDataSource = (DruidDataSource) DruidDataSourceFactory.createDataSource(beanToMap(sqlInfoVO)); druidDataSource.init(); Map<Object, Object> targetMap = DynamicRoutingDataSource.targetTargetDataSources; targetMap.put(projectId, druidDataSource); this.setTargetDataSources(targetMap); this.afterPropertiesSet(); logger.info("dataSource [{}] has been added" + projectId); } catch (Exception e) { logger.error(e.getMessage()); return false; } return true; }
public synchronized boolean delDatasources(String datasourceid) { Map<Object, Object> dynamicTargetDataSources2 = DynamicRoutingDataSource.targetTargetDataSources; if (dynamicTargetDataSources2.containsKey(datasourceid)) { Set<DruidDataSource> druidDataSourceInstances = DruidDataSourceStatManager.getDruidDataSourceInstances(); for (DruidDataSource l : druidDataSourceInstances) { if (datasourceid.equals(l.getName())) { System.out.println(l); dynamicTargetDataSources2.remove(datasourceid); DruidDataSourceStatManager.removeDataSource(l); setTargetDataSources(dynamicTargetDataSources2); super.afterPropertiesSet(); return true; } } return false; } else { return false; } }
private <T> Map<String, Object> beanToMap(T bean) { Map<String, Object> map = new HashMap<>(); if (bean != null) { BeanMap beanMap = BeanMap.create(bean); for (Object key : beanMap.keySet()) { map.put(key+"", beanMap.get(key)); } } return map; }
}
|
package com.trey.dynamicdatasource.pojo;
import lombok.Data;
@Data public class ExtendSource{ private Long Id; private String dbName; private String dbType; private String connectionInfo; }
package com.trey.dynamicdatasource.pojo;
import lombok.Data;
@Data public class SqlInfoVO { private String username; private String password; private String url; private String driverClassName; private String projectId; }
|
package com.trey.dynamicdatasource.action;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.trey.dynamicdatasource.datasource.DynamicDataSourceContextHolder; import com.trey.dynamicdatasource.datasource.DynamicRoutingDataSource; import com.trey.dynamicdatasource.pojo.ExtendSource; import com.trey.dynamicdatasource.pojo.SqlInfoVO; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier;
import java.sql.*; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional;
@Slf4j public abstract class AbstractSqlQueryStrategy implements DataBaseQuery {
@Autowired @Qualifier("dynamicDataSource") private DynamicRoutingDataSource dynamicDataSource;
protected static final String JDBC_URL = "jdbcUrl"; protected static final String USERNAME = "userName"; protected static final String PASSWORD = "password";
protected abstract String getDriverName();
@Override public Optional<Map<String,Object>> query(ExtendSource extendSource, String sql){
SqlInfoVO sqlInfoVO = createSqlInfoVO(extendSource); DynamicDataSourceContextHolder.setDataSourceKey(sqlInfoVO.getProjectId()); if(!DynamicRoutingDataSource.isExistDataSource(sqlInfoVO.getProjectId())) { dynamicDataSource.addDataSource(sqlInfoVO); }
if (sqlInfoVO == null) { log.error("mysql source setting error, please check!"); } Map<String,Object> resultMap = jdbcQuery(sql); if(resultMap.isEmpty()) { return Optional.empty(); } return Optional.of(resultMap); }
private Map<String,Object> jdbcQuery(String sql) { Statement stmt; Connection conn = null; try { conn = dynamicDataSource.getConnection(); if (conn.isClosed()) { return Collections.EMPTY_MAP; }
stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(sql); ResultSetMetaData md = rs.getMetaData(); int columCount = md.getColumnCount(); while(rs.next()) { Map<String,Object> rowData = new HashMap<>(); for(int i = 1; i <= columCount; i++) { rowData.put(md.getColumnName(i), rs.getObject(i)); } return rowData;
} } catch (SQLException e) { log.error(e.getMessage()); } finally { DynamicDataSourceContextHolder.clearDataSourceKey(); } return Collections.EMPTY_MAP; }
private SqlInfoVO createSqlInfoVO(ExtendSource extendSource) { String connectionInfo = extendSource.getConnectionInfo(); SqlInfoVO sqlInfoVO = new SqlInfoVO(); sqlInfoVO.setDriverClassName(getDriverName()); JSONObject jsonObject = JSON.parseObject(connectionInfo); sqlInfoVO.setUrl(jsonObject.getString(JDBC_URL)); sqlInfoVO.setPassword(jsonObject.getString(PASSWORD)); sqlInfoVO.setUsername(jsonObject.getString(USERNAME)); sqlInfoVO.setProjectId(extendSource.getDbType() + extendSource.getId()); return sqlInfoVO; } }
|
package com.trey.dynamicdatasource.action;
import com.trey.dynamicdatasource.pojo.ExtendSource;
import java.util.Map; import java.util.Optional;
public interface DataBaseQuery {
Optional<Map<String,Object>> query(ExtendSource extendSource, String sql);
}
|
package com.trey.dynamicdatasource.action;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
import java.util.Map; import java.util.concurrent.ConcurrentHashMap;
@Component public class DataBaseQueryContext {
@Autowired private Map<String, DataBaseQuery> contextStrategy = new ConcurrentHashMap<>();
public DataBaseQuery build(String type) { return contextStrategy.get(type); }
}
|
package com.trey.dynamicdatasource.action;
import org.springframework.stereotype.Component;
@Component("MYSQL") public class MysqlQueryStrategy extends AbstractSqlQueryStrategy {
private static final String DRIVER = "com.mysql.jdbc.Driver";
@Override protected String getDriverName() { return DRIVER; } }
|
dynamic-datasource-starts