动态数据源切换:核心代码实现手册
文档说明:本文档是《动态数据源切换架构设计》的实现篇,深入剖析核心类的代码实现细节。建议先阅读架构设计文档以理解整体设计思想。
一、核心类概览
| 类名 | 核心职责 | 对应架构层级 |
|---|---|---|
ConnectionConfig | DTO,承载外部数据库的连接信息(URL/User/Pwd)。 | L1 应用层 |
@DynamicSource | 注解,标记需要进行数据源切换的方法。 | L1 应用层 |
ContextSwitchAspect | AOP切面,拦截注解,负责上下文的设置与清理。 | L2 拦截层 |
DynamicRoutingEngine | 核心引擎,继承 SpringAbstractRoutingDataSource,管理连接池全生命周期。 | L3 路由层 / L4 资源层 |
二、契约定义 (Contract)
2.1 配置对象 (ConnectionConfig)
这是一个纯粹的数据传输对象(DTO),利用 Lombok 简化了代码。它是业务层与底层数据源之间的“协议”。
@SuperBuilder(toBuilder=true)@Getter@AllArgsConstructor@NoArgsConstructorpublicclassConnectionConfig{StringdriverClassName;// 驱动类,如 com.mysql.cj.jdbc.DriverStringurl;// JDBC URLStringuserName;// 用户名Stringpassword;// 密码}2.2 切换注解 (@DynamicSource)
用于标记在 Service 或 DAO 层的方法上,声明该方法需要连接到外部数据源。
@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)public@interfaceDynamicSource{StringDEFAULT_GROUP="default_group";// 数据源分组,用于日志或监控Stringvalue()defaultDEFAULT_GROUP;}三、切面拦截 (The Interceptor)
ContextSwitchAspect是整个机制的入口。它利用 Spring AOP 环绕通知(Around Advice)接管了方法的执行。
3.1 核心拦截逻辑 (around)
@Around("@annotation(dynamicSource)")publicObjectaround(ProceedingJoinPointjoinPoint,DynamicSourcedynamicSource)throwsThrowable{// 1. 【提取参数】从方法参数中寻找 Config 对象ConnectionConfigconfig=fetchConfig(joinPoint);Assert.notNull(config,"connection config is null");// 2. 【URL 转换】处理内网/外网地址映射(可选逻辑)config=config.toBuilder().url(normalizeUrl(config.getUrl())).build();// 3. 【事务检查】关键安全机制ensureTransactionSafety(config.getUrl());try{// 4. 【激活数据源】创建连接池并绑定到 ThreadLocalroutingEngine.activateConnection(dynamicSource.value(),config);// 5. 【执行业务逻辑】returnjoinPoint.proceed();}finally{// 6. 【资源清理】必须执行,防止 ThreadLocal 污染routingEngine.clearCurrentContext();}}3.2 事务安全检查 (ensureTransactionSafety)
这是架构设计中“禁止跨库事务”的代码落地。
🤔 为什么必须禁止?
Spring 的事务管理器(TransactionManager)在事务开启时,会将数据库连接(Connection)绑定到当前线程。
如果在事务执行过程中尝试切换数据源,Spring 可能会继续复用旧的连接(导致数据写入错误的库),或者抛出连接不可用的异常。
因此,必须在切面层提前拦截,确保“在事务中不能切换数据源”。
privatevoidensureTransactionSafety(StringtargetUrl){// 1. 如果当前不在事务中,直接放行 (安全)if(!TransactionSynchronizationManager.isActualTransactionActive()){return;}// 2. 获取当前线程绑定的数据库连接资源ConnectionHolderconnectionHolder=(ConnectionHolder)TransactionSynchronizationManager.getResourceMap().get(routingEngine);// 3. 校验:如果已经持有连接,必须保证 URL 一致if(connectionHolder!=null){Connectionconn=connectionHolder.getConnection();StringcurrentUrl=conn.getMetaData().getURL();// 如果事务已经开启在 DB-A,但当前方法请求 DB-B,这是危险操作,必须报错if(!Objects.equals(targetUrl,currentUrl)){thrownewRuntimeException("禁止在事务中切换数据源:事务已绑定到 "+currentUrl+",但试图切换至 "+targetUrl);}}}四、动态路由引擎 (The Engine)
DynamicRoutingEngine是最复杂的类,它集成了Spring 路由、连接池工厂和LRU 缓存三大功能。
4.1 线程上下文管理 (ThreadLocal)
🤔 为什么要用 ThreadLocal?
DynamicRoutingEngine继承自 Spring 的AbstractRoutingDataSource。
它的核心路由方法determineCurrentLookupKey()是无参的。
这意味着我们无法通过方法参数直接把“当前要用哪个数据库”传递进去。
因此,ThreadLocal成为了唯一的“隐式通道”,用于将 AOP 层解析出的 Data Source Key 传递给底层的路由方法。
// 存储当前线程的数据源 Key (MD5值)privatestaticfinalThreadLocal<String>contextHolder=newThreadLocal<>();// AOP 调用此方法设置 KeypublicvoidactivateConnection(Stringgroup,ConnectionConfigconfig){// ... 创建或获取连接池逻辑 ...contextHolder.set(context.getLookupKey());}// AOP 调用此方法清理 KeyvoidclearCurrentContext(){contextHolder.remove();MDC.remove("ds_name");// 清理日志上下文}4.2 Spring 路由钩子 (determineCurrentLookupKey)
这是AbstractRoutingDataSource定义的抽象方法。ORM 框架(如 MyBatis)在请求DataSource.getConnection()时,Spring 会自动回调此方法来决定返回哪个具体的 DataSource。
@OverrideprotectedObjectdetermineCurrentLookupKey(){// 1. 从 ThreadLocal 获取 KeyStringkey=contextHolder.get();// 2. 如果 Key 为空,返回 null (Spring 会使用默认数据源)if(key==null){returnnull;}// 3. 简单的校验与日志RoutingContextcontext=lookupKeyMap.get(key);if(context!=null){// 刷新活跃时间,用于 LRUcontext.refreshLastActiveTime();// 设置 MDC,让日志中包含数据源名称MDC.put(MDC_KEY,context.getLookupKey());}returnkey;}4.3 双 Key 索引与连接池复用
为了解决隐私安全与去重的矛盾,我们设计了双 Key 机制。我们可以把它比作“身份证”与“房卡”的关系:
- LongKey (身份证+详细信息):
- 内容:
category_url_username_password_driver(包含密码等所有细节)。 - 作用:只在“办理入住”(创建连接池)时使用。
- 逻辑:系统拿着这个详细清单去查:“这位客人(这个配置)以前来过吗?”。如果来过,直接复用旧房间;没来过,才开新房间。
- 内容:
- LookupKey (房卡/房间号):
- 内容:
MD5(LongKey)(一串看不出原始信息的短字符)。 - 作用:日常通行证。
- 价值:
- 安全:你拿着房卡(LookupKey)在系统里到处走(存入 ThreadLocal、打印日志),别人捡到了也无法反推出你的银行卡密码(数据库密码)。
- 轻便:MD5 长度固定,做 Map 索引比长字符串更快。
- 内容:
// Map 1: 全量 Key -> Context (用于去重)privatefinalMap<String,RoutingContext>configMap=newConcurrentHashMap<>();// Map 2: MD5 Key -> Context (用于路由查找)privatefinalMap<String,RoutingContext>lookupKeyMap=newConcurrentHashMap<>();publicvoidactivateConnection(...){// 1. 生成包含密码的全量 KeyStringlongKey=generateUniqueKey(group,config);// 2. 先查缓存 (是否存在该连接池)RoutingContextcontext=configMap.get(longKey);if(context==null){// 3. 如果不存在,创建新连接池DataSourcepool=createConnectionPool(config);// 4. 生成短 Key (MD5),用于后续的路由和日志StringlookupKey=md5(longKey);context=RoutingContext.builder().longKey(longKey).lookupKey(lookupKey).build();// 5. 注册到 Spring 的 targetDataSources Map 中registerDataSource(context,pool);}// 6. 绑定短 Key 到当前线程 (安全)contextHolder.set(context.getLookupKey());}五、生命周期管理 (Lifecycle & LRU)
为了防止无限创建连接池导致 OOM,系统通过定时任务调用evictExpiredDataSources进行清理。
5.1 LRU 驱逐逻辑
privatebooleanisDataSourceAvailable(RoutingContextcontext,DataSourcepool){// 策略 1: 快速检查 (连接池自身状态)if(isPoolHealthy(pool)){returntrue;}// 策略 2: LRU 超时检查// 如果 (当前时间 - 最后活跃时间) > maxIdleTime (默认30分钟)DurationidleTime=Duration.between(context.getLastActiveTime(),LocalDateTime.now());if(idleTime.compareTo(MAX_IDLE_TIME)>0){returnfalse;// 标记为不可用 -> 将被移除}// 策略 3: 物理连接探活 (异步执行 SQL: SELECT 1)// ...}六、代码使用示例
6.1 定义 DAO 接口
@RepositorypublicclassDataRepository{@AutowiredprivateSqlSessionsqlSession;// 核心:打上注解,第一个参数必须是 Config@DynamicSourcepublicList<Map<String,Object>>queryExternalData(ConnectionConfigconfig,Stringsql){// 这里的 sqlSession 会自动被路由到 config 指定的数据库returnsqlSession.selectList("com.example.mapper.selectBySql",sql);}}6.2 业务层调用
publicvoidgenerateReport(){// 1. 构建配置ConnectionConfigmysqlConfig=ConnectionConfig.builder().url("jdbc:mysql://10.0.0.1:3306/bi_db").userName("admin").password("secret").driverClassName("com.mysql.cj.jdbc.Driver").build();// 2. 调用 DAO (切面会自动介入)List<Map<String,Object>>result=dataRepository.queryExternalData(mysqlConfig,"SELECT * FROM report LIMIT 10");// 3. 处理结果...}