项目情景:多个用户,每个用户都存在一个数据库资源,所有用户数据源信息存放在另一个主库表中(可以是其它数据库)
问题:采用AbstractRoutingDataSource无法统一管理数据源并与连接池实时管理,多数据源也需要实时加载不一次性获取所有用户数据库信息
解决方案:采用dynamicdatasource,在配置中只配置主数据源即用户连接信息表及通用表所在数据库资源,用户信息通过拦截器实时添加相关用户数据源, 并切换当前数据源为用户默认数据源,不存在时就直接适用主数据源(默认主数据源)
前置准备
依赖
- 多数据源依赖,获取springboot对应版本的dynamic依赖版本
<dependency> <groupId>com.baomidou</groupId> <artifactId>dynamic-datasource-spring-boot-starter</artifactId> <version>${dynamic-datasource.version}</version> </dependency>
|
- 数据连接依赖,存在多个不同源数据库,则需要引入相对应的依赖版本
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency>
<dependency> <groupId>com.postgresql</groupId> <artifactId>vango-postgresql</artifactId> <version>${vango-postgresql.version}</version> </dependency>
|
- 其它依赖如连接池、nacos、spring boot等按项目需求引入
多数据源主配置
- 当前配置都只配置主数据源,以及连接池相关信息,注意连接池的配置部分在多数据源中有过封装,名称可能不一致,需要一一对照
- 以druid连接池为准
# dev-config spring: datasource: type: com.alibaba.druid.pool.DruidDataSource dynamic: druid: #全局设置 # 初始化连接大小 initial-size: 1 #最大连接池数量 max-active: 2 # 最小连接池数量 min-idle: 0 # 配置获取连接等待超时的时间 单位毫秒 max-wait: 60000 # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 time-between-eviction-runs-millis: 60000 # 配置一个连接在池中最小生存的时间,单位是毫秒 min-evictable-idle-time-millis: 300000 # 申请连接的时候检测,建议配置为true,不影响性能,并且保证安全性 test-while-idle: true # 获取连接时执行检测,建议关闭,影响性能---多数据源防止数据库关闭等 test-on-borrow: true # 归还连接时执行检测,建议关闭,影响性能 test-on-return: false #重试次数 break-after-acquire-failure: true connection-error-retry-attempts: 5 # 是否缓存preparedStatement,也就是PSCache。PSCache对支持游标的数据库性能提升巨大,比如说oracle poolPreparedStatements: true # 要启用PSCache,必须配置大于0,当大于0时, poolPreparedStatements自动触发修改为true, # 在Druid中,不会存在Oracle下PSCache占用内存过多的问题, # 可以把这个数值配置大一些,比如说100 maxOpenPreparedStatements: 20 # 连接池中的minIdle数量以内的连接,空闲时间超过minEvictableIdleTimeMillis,则会执行keepAlive操作 keepAlive: true validationQuery: SELECT 'x'
# 配置监控统计拦截器 防火墙 日志配置 # stat监控数据库性能 # wall 用于防火墙 # 日志 slf4j logback # log4j # log4j2 # 配置多个英文逗号分隔 filters: stat,slf4j # 合并多个DruidDataSource的监控数据 use-global-data-source-stat: true # 通过connectProperties属性来打开mergeSql功能;慢SQL记录 SQL优化 connect-properties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=2000 # Spring 监控,利用aop 对指定接口的执行时间,jdbc数进行记录 aop-patterns: "com.hhyunerp.service.*" ########## 配置WebStatFilter,用于采集web关联监控的数据 ########## # web-stat-filter: # enabled: true # 启动 StatFilter # url-pattern: /* # 过滤所有url # exclusions: "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*" # 排除一些不必要的url # session-stat-enable: true # 开启session统计功能 ########## 配置StatViewServlet(监控页面),用于展示Druid的统计信息 ########## primary: dsConfig strict: true datasource: dsConfig: url: //需要配置连接参数,与正常参数一致 username: erptest_root password: GM2020+!@# driver-class-name: com.mysql.cj.jdbc.Driver # 初始化连接大小 initial-size: 10 # 最大连接池数量 max-active: 10 # 最小连接池数量 min-idle: 10 # pgSql: # url: jdbc:postgresql://XX:5432/grasphhmaster?searchpath=grasp,public¤tSchema=grasp&escapeSyntaxCallMode=callIfNoReturn # username: XX # password: XX # driver-class-name: org.postgresql.Driver
|
- 以springboot默认hikaricp为准
# dev-config spring: datasource: dynamic: primary: dsConfig # strict: true hikari: # 为支持catalog的数据库设置默认的catalog,默认依赖于jdbc驱动 #catalog: true # 控制客户端等待池中连接的最长毫秒数。 connection-timeout: 60000 # 指定验证连接有效性的超时时间(默认是5秒,最小不能小于250毫秒) #validation-timeout: 5000 # 连接允许被闲置在池中的最大时间 >max-lifetime会重置为0 idle!=0且小于10s会重置为10s idle-timeout: 600000 # 连接被占用的超时时间,也就是连接泄露检测的最大时间,默认是 0,0表示的是不开启泄露检测,最大值不能超过连接的最大存活时间,也就是maxLifetime #leak-detection-threshold: # 控制连接池中一个连接的最大生存时间 要小于 url配置的wait_timeout时间 max-lifetime: 800000 # 最大连接数 poolsize>idle时且timeout>0会移除timeout的空闲连接 max-pool-size: 2 # 连接池保持数据连接的最小数量 min-idle: 0 # 如果池无法成功初始化连接,则此属性控制池是否“快速失败”的时间,在此超时发生之前无法获取连接,则将引发异常 1ms # initialization-fail-timeout: # 设置一个SQL语句,该语句将在每次创建新连接之后,添加到池中之前执行 # connection-init-sql: select 1 # 心跳检测 connection-test-query: SELECT 'x' # dataSource-class-name: # dataSource-jndi-name: # 从池返回的连接的默认事务隔离级别 例如TRANSACTION_READ_COMMITTED, TRANSACTION_REPEATABLE_READ等 # transaction-isolation-name: # 自动提交 is-auto-commit: true # 指定从连接池获取到的连接在默认情况下是否是只读模式的。这个取决于数据库和应用,默认false # is-read-only: #是否在其自己的事务中隔离内部池查询 #由于这些查询通常是只读查询,所以很少需要将它们封装在自己的事务中。 # 此属性仅适用于禁用autoCommit的情况。默认值:false # is-isolate-internal-queries: # 是否注册Mbeans,默认false # is-register-mbeans: # 连接池是否可以被JMX挂起和恢复。在挂起状态下,获取连接将不会超时,会被阻塞直到连接池恢复。,默认false # is-allow-pool-suspension: # data-source-properties: # health-check-properties: datasource: dsConfig: url: jdbc:mysql:XXX/xxx?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&rewriteBatchedStatements=true&allowPublicKeyRetrieval=true&allowMultiQueries=true&useAffectedRows=true&connectTimeout=60000&socketTimeout=60000&autoReconnect=true username: XX password: XX driver-class-name: com.mysql.cj.jdbc.Driver
|
数据源检查与更新(存在并发及多集群下问题解决)
并发问题
集群问题
数据源操作
- 添加数据源信息
/** * 生成当前客户数据源路由 * currentDbUUID: 当前数据源创建时间戳(检查数据源时,需要更新即创建) * * @param cusDbserver */ private String createDynamicDataSourceRouting(Integer customerId, String currentDbUUID) { log.info(">>>>>> 【添加客户数据源】: addCustomDataSource" + "\n>>>>>> customerId:{}", customerId); //查询客户数据源信息 List<TCusDbServerPO> cusDbServerQueries = cusDbserverMapper.listCustomerById(customerId); if (cusDbServerQueries.size() != 1) { throw new GoLoginException(SystemExceptionEnum.LOGIN_USERINFO_ERROR); } TCusDbServerPO cusDbserver = cusDbServerQueries.get(0); DataSourceProperty dataSourceProperty = new DataSourceProperty(); String decodePassword = null; try { decodePassword = DesUtil.hhComDecode(cusDbserver.getPassword()); } catch (Exception e) { throw new MySystemException(SystemExceptionEnum.INTERNAL_SERVER_ERROR.getCode(), e.getMessage()); } dataSourceProperty.setPoolName(cusDbserver.getDbName()); dataSourceProperty.setUsername(cusDbserver.getUsername()); dataSourceProperty.setPassword(decodePassword);
String url = dynamicDbProperties.getPgsqlUrl(cusDbserver.getInnerAddress(),cusDbserver.getInnerport(),cusDbserver.getDbName()); Integer dev = Optional.ofNullable(systemDefineProperties.getDev()).orElse(1); if (dev == 1) { //开发环境-外网 url = dynamicDbProperties.getPgsqlUrl(cusDbserver.getOuterAddress(),cusDbserver.getOuterPort(),cusDbserver.getDbName()); } dataSourceProperty.setUrl(url); dataSourceProperty.setDriverClassName(dynamicDbProperties.getPgsqlDriver()); dataSourceProperty.setLazy(true); //sqlserver需要配置 DynamicRoutingDataSource ds = (DynamicRoutingDataSource) dataSource; long startTime = System.currentTimeMillis(); DataSource dataSource = dataSourceCreator.createDataSource(dataSourceProperty); long time1 = System.currentTimeMillis(); String dbName = cusDbserver.getCustomerId() + cusDbserver.getDbName() + "_" + currentDbUUID; ds.addDataSource(dbName, dataSource); long time2 = System.currentTimeMillis(); log.info(">>>>>> 【数据源连接】:getDynamicRoutingDataSource" + "\n>>>>>> url: {}" + "\n>>>>>> connectionName: {}" + "\n>>>>>> createDataSource: {}" + "\n>>>>>> addDataSource: {}", url, dbName, time1-startTime,time2-startTime); return dbName; }
|
- 在拦截器中对所有请求拦截,判定是否需要进行数据源更新(同时设置当前请求的数据源为该数据源)
// 检查数据源操作(通过查询的数据库名称及用户id唯一确定): public String checkUpdateCustomDynamicSource(String dbName, Integer customId){ if (StringUtils.isBlank(dbName) || Objects.isNull(customId)) { DynamicDataSourceContextHolder.push(defaultDbName); log.info(">>>>>> 客户数据源dbName或者customId获取失败"); throw new GoLoginException("数据源信息失效,需重新登录!"); } //获取当前所有数据源 DynamicRoutingDataSource ds = (DynamicRoutingDataSource) dataSource; Map<String, GroupDataSource> groupDataSources = ds.getGroupDataSources(); Set<String> groups = groupDataSources.keySet(); String groupName = customId + dbName; String uuidKey = dbName + ":" + CacheKeyEnum.DYNAMIC_CURRENT_CUSTOM.getKey(); //获取连接创建时间戳 String uuid = redisService.hget(uuidKey, groupName); boolean updateCache = false; if (StringUtils.isBlank(uuid)) { uuid = String.valueOf(System.currentTimeMillis()); updateCache = true; } String newCustomConn = ""; if (groups.contains(groupName)) { GroupDataSource groupDataSource = groupDataSources.get(groupName); Set<String> dbs = groupDataSource.getDataSourceMap().keySet(); if (CollectionUtils.isNotEmpty(dbs)) { long uid1 = Long.parseLong(uuid); for (String conn : dbs) { String[] split = conn.split("_"); if (split.length != 2) { ds.removeDataSource(conn); continue; } String group = split[0]; String curUid = split[1]; if (StringUtils.isBlank(group) || StringUtils.isBlank(curUid)) { ds.removeDataSource(conn); continue; } long uid2 = Long.parseLong(curUid); if (uid2 < uid1) { ds.removeDataSource(conn); continue; } //重新创建之后更新的数据源不进行删除并使用该数据源 newCustomConn = conn; } } } if (StringUtils.isBlank(newCustomConn)) { //添加数据源 uuid = String.valueOf(System.currentTimeMillis()); newCustomConn = addCustomDataSource(customId, uuid); } if (updateCache) { redisService.hset(uuidKey, groupName, uuid, cacheProperties.getDynamicCacheTime()); } DynamicDataSourceContextHolder.push(newCustomConn); String peek = DynamicDataSourceContextHolder.peek(); log.info(">>>>>> 当前客户数据源: {}", peek); return newCustomConn; }
|
连接异常处理
- 在获取连接或者语句进行执行获取连接时,存在极小概率会获取失败,为了友好,便对异常进行了处理
//连接问题 @ExceptionHandler({CannotGetJdbcConnectionException.class, MyBatisSystemException.class}) @ResponseBody public Response jdbcexceptionHandler(Exception e) { Integer code = SystemExceptionEnum.CONN_DB_ERROR.getCode(); String errorInfoToString = ErrorProcessor.errorInfoToString(e); if (errorInfoToString.contains("CannotGetJdbcConnectionException")) { //清空redis直接重新请求 dynamicSourceService.removeCustomDynamicSource(); ErrorProcessor.errorCodeGen(code); logger.error(">>>>>> CannotGetJdbcConnectionException " + "\n>>>>>> 时间:{} " + "\n>>>>>> 详情: {}", TimeUtils.localDateToStr(LocalDateTime.now(), null), errorInfoToString); return Response.error(code, "数据连接等待获取中,请稍后重试!"); } String message = e.getMessage(); String msg = message.substring(message.lastIndexOf(":") + 1); String newMsg = ErrorProcessor.errorCodeGen(code,msg); logger.error(">>>>>> MyBatisSystemException " + "\n>>>>>> 时间:{} " + "\n>>>>>> 详情: {}", TimeUtils.localDateToStr(LocalDateTime.now(), null), errorInfoToString); return Response.error(code, newMsg); }
|