造物无言却有情,每于寒尽觉春生。 —— 张维屏 《新雷》
序. 介绍
XXL-CONF 是一款分布式配置中心,特性:轻量级、秒级动态推送、多环境、跨语言、跨机房、配置监听、权限控制、版本回滚。
配置中心的作用相信大家都知道,不需要重新编译打包,不需要重启线上服务器,通过修改配置中心的配置项的数据,实时推送新的配置项数据到各个引用的项目中,实时更新。
XXL-CONF 项目文档完善,接入简单,部署容易,可高可用,高性能,使用文档如下,感兴趣的小伙伴可以看一遍,这里不在针对如何使用做阐述了。
官网链接:这是链接,点这
我们首先先思考一个问题:如何实现分布式配置中心呢
带着疑问阅读本文,本文主要剖析 XXL-CONF 1.6.2-SNAPSHOT
版本源码,通过学习优秀框架实现思路,沉淀技术积累。
1. 思考:如何实现配置中心
数据结构
首先在业务需求上,我们应该提前规划好配置中心支持哪些 “配置”,是普通的基础数据类型值,还是像 Json,或者 Yaml ,XML 这样的文件格式,也就是先确定数据结构。
存储
元配置数据肯定是需要存储:所以我们需要一个可靠、高性能,可扩展的存储系统来存储和管理配置信息,例如分布式数据库、分布式文件系统。
数据隔离
支持多环境配置数据,各个环境之间需要相互隔离,如何隔离?
表中环境字段隔离,部署单个配置中心集群,定义多套环境隔离不同环境的配置数据可以同享配置中心资源。
还是部署多配置中心集群,集群 1 指定定义环境 product,集群 2 指定定义环境 test 等,避免多集群相互影响。
客户端和配置中心的通信选型
协议支持上,节点间的通信可以选则比较常用的 HTTP 协议,配置中心将配置数据暴露为 HTTP API,客户端使用 HTTP 访问该 API 以获取配置数据。
或者使用业界成熟的应用通信技术框架,例如 Dubbo,Thrift,gRPC 等。
再或者使用专为网络通信设计的基于 NIO 的高性能框架 Netty,更好的支持高并发,高吞吐的通信场景。
支持客户端的接入方式
使用 HTTP 协议实现的接口,天然支持多端,多语言,但可以为例如 Java 语言,Spring 框架提供接入组件。
实现配置变更通知机制
当配置数据发生变更时,需要及时通知各个实例。
可以基于轮询机制,客户端定期向配置中心发送请求,查询配置是否变更,若变更主动向配置中心拉取变更数据,优缺点显而易见,优点:简单,缺点:频繁发送请求,性能、网络资源消耗大。
或者基于长连接机制,客户端和配置中心建立长连接,配置数据更新时,配置中心通过长连接实时通知客户端,显而易见的缺点就是,维护长连接的成本对服务器的资源消耗较大。
在或者引入消息队列,配置中心将配置变更信息发布到消息队列,客户端从消息队列中订阅配置变更信息,缺点显而易见,引入消息队列后增加了系统的复杂度。
再或者可以选型 ZooKeeper 或者 Etcd 等作为分布式协调服务,管理配置,通过订阅配置节点上的变更来获取配置更新。
安全的考量
为保护配置数据的安全性,需要添加访问控制机制,例如在 API 接口中添加认证、授权等机制来限制访问。
WEB 端可视化配置管理
增删改查配置直接通过 WEB 界面完成。
跨机房,异地多活
配置中心集群关系对等特性,集群各节点提供幂等的配置服务。因此异地跨机房部署时,只需要请求本机房配置中心即可,避免跨机房请求可能遇到的网络疑难杂症。实现异地多活,也可以防止同机房服务全部宕机,访问其他机房可用,达到容灾的效果。
配置中心高可用
支持集群部署可提升系统可用性。设计多级存储可有助于降级容灾,
例如一级存储: DB 做元数据存储。
二级存储:配置中心磁盘文件作为配置中心集群的镜像文件。
三级存储:客户端镜像文件,配置中心故障降级使用。
四级存储:接入配置中心的客户端自己的内存 LocalCache 数据
提升性能的同时,降低对底层配置服务的压力。
以上就是笔者对如何实现配置中心这个问题的思考,各位小伙伴也可以思考下,下面让我们来看 XXL-CONF 是如何实现的吧。
2.项目架构
2.1 项目结构
clone 下来的源码项目结构如下图所示
xxl-conf-admin 为配置中心 - 配置管理平台(包含 WEB 模块),提供可视化界面操作数据
有如下功能:环境管理、用户管理、项目管理、配置管理等
xxl-conf-core 为客户端接入的组件,支持配置中心配置项变更动态监听功能,内存级 LocalCache 保证高性能。
xxl-conf-samples 提供了两个 demo 项目,其中 xxl-conf-sample-frameles 项目为无框架版本,可以直接 main 方法启动运行,xxl-conf-sample-springboot 则是支持 Springboot 版本。
2.2 多级存储
2.2.1 一级存储-DB存储
XXL-CONF 的 DB 存储使用的是 MySQL,我们先来看下数据库的表结构。
XXL-CONF 配置数据是通过表字段隔离,通过环境和项目名称字段当做业务属性隔离,也就是说不同的环境不同项目之间的配置是隔离的。
xxl_conf_env
表为环境标识表,存放所有的环境标识。
CREATE TABLE `xxl_conf_env` (
`env` varchar(100) NOT NULL COMMENT 'Env',
`title` varchar(100) NOT NULL COMMENT '环境名称',
`order` tinyint(4) NOT NULL DEFAULT '0' COMMENT '显示排序',
PRIMARY KEY (`env`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
xxl_conf_project
表为项目描述表,存放项目标识。
CREATE TABLE `xxl_conf_project` (
`appname` varchar(100) NOT NULL COMMENT 'AppName',
`title` varchar(100) NOT NULL COMMENT '项目名称',
PRIMARY KEY (`appname`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
其中 appname 多提一嘴,每个项目拥有唯一的 appname,作为项目标识,同时作为该项目下配置的统一前缀。
xxl_conf_user
表存放着用户数据,包括账号,密码,该用户拥有的权限数据等。
CREATE TABLE `xxl_conf_user` (
`username` varchar(100) NOT NULL COMMENT '账号',
`password` varchar(100) NOT NULL COMMENT '密码',
`permission` tinyint(4) NOT NULL DEFAULT '0' COMMENT '权限:0-普通用户、1-管理员',
`permission_data` varchar(1000) DEFAULT NULL COMMENT '权限配置数据',
PRIMARY KEY (`username`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
xxl_conf_node
表存放配置数据最新的版本,按照 env 环境和 appname 项目标识隔离。
CREATE TABLE `xxl_conf_node` (
`env` varchar(100) NOT NULL COMMENT 'Env',
`key` varchar(200) NOT NULL COMMENT '配置Key',
`appname` varchar(100) NOT NULL COMMENT '所属项目AppName',
`title` varchar(100) NOT NULL COMMENT '配置描述',
`value` varchar(2000) DEFAULT NULL COMMENT '配置Value',
PRIMARY KEY (`env`,`key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
其中多提一嘴,配置的 key,创建时将会自动添加所属项目的 appname 所谓前缀,生成最终的 key。可通过客户端使用最终的 key 获取配置。
xxl_conf_node_log
表记录着配置项的所有变更操作日志。
CREATE TABLE `xxl_conf_node_log` (
`env` varchar(255) NOT NULL COMMENT 'Env',
`key` varchar(200) NOT NULL COMMENT '配置Key',
`title` varchar(100) NOT NULL COMMENT '配置描述',
`value` varchar(2000) DEFAULT NULL COMMENT '配置Value',
`addtime` datetime NOT NULL COMMENT '操作时间',
`optuser` varchar(100) NOT NULL COMMENT '操作人'
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
xxl_conf_node_msg
表记录着配置项数据的变更通知,是配置中心实时通知客户端配置变更的核心。
CREATE TABLE `xxl_conf_node_msg` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`addtime` datetime NOT NULL,
`env` varchar(100) NOT NULL COMMENT 'Env',
`key` varchar(200) NOT NULL COMMENT '配置Key',
`value` varchar(2000) DEFAULT NULL COMMENT '配置Value',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
2.2.2 多级存储
上文中我们提到了设计多级存储可有助于降级容灾
二级存储:配置中心磁盘文件作为配置中心集群的镜像文件。
三级存储:存储客户端磁盘的镜像文件,配置中心故障降级使用。
四级存储:接入配置中心的客户端自己的内存 LocalCache 数据
多级存储的存在提升性能的同时,降低对底层配置服务的压力。
实际上 XXL-CONF 就是这么设计的,我们直接来看 XXL-CONF 多级存储的具体实现。
二级存储
是存储在配置中心服务器磁盘上的镜像文件 Properties,文件存放的路径可以通过 admin 配置中心的 application.properties 中的 xxl.conf.confdata.filepath
属性指定。
admin 配置中心在给接入客户端提供的 API:/conf/find
查询配置数据就是直接从配置中心的磁盘文件 Properties 中寻找该配置项数据。
配置中心守护线程定时推送客户端通知告知配置变更,数据的对比就是通过 key 关联的 xxl_conf_node_msg
消息,与磁盘文件 Properties 的配置项数据比对。
考虑到 value 的值可能比较大,为了提高检索效率,每一个配置项用一个单独的文件来保存,文件名由环境+ 项目名 + key组成,这样做便于查询的时候能够快速精准定位。
文件记录当前配置的值
格式如下:value=XXX
当配置被删除后文件记录着文件被删除
格式如下:value-deleted=true
三级存储
是存储在客户端磁盘的镜像文件,文件存放的路径可以通过客户端的 application.properties 中 xxl.conf.mirrorfile
属性指定。
客户端在第一次启动时,先拉取服务端指定配置项数据,存储客户端磁盘镜像文件中。
同理每一个配置项用一个单独的文件来保存。
四级存储
客户端内存级别存储,客户端在第一次启动时,先拉取服务端指定配置项数据,存储客户端磁盘镜像文件中,在添加到 localCacheRepository 中。
private static ConcurrentHashMap<String, CacheNode> localCacheRepository = null;
2.3 架构设计
2.3.1 CONF(admin)架构
我们可以首先看到 xxl-conf-admin 配置中心服务提供的配置管理的 CRUD 操作都强依赖 DB,DB 作为配置数据源存储着配置信息,配置的版本变更信息等。
其次维护一个本地磁盘文件 Properties
,作为配置中心集群配置的镜像数据,每当新增,更新,删除配置数据的时候同步更新到磁盘文件中,并广播通知每个集群节点实时刷新节点磁盘配置数据,接入方客户端long-polling 接收到配置已变更通知,主动向配置中心拉取最新配置数据。
下图来源 XXL-CONF 官网
2.3.2 客户端架构
客户端只需要使用组件封装好的 API,即可一行代码获取配置信息。
客户端内存级的 LocalCache 本地缓存,极大提升 API 层的性能,降低对配置中心集群的压力。首次加载配置、监听配置变更、底层异步周期性同步配置时,写入或更新缓存。
配置数据的本地快照文件,会周期性同步 LocalCache 中的配置数据写入到 Mirror-File 镜像文件中;当无法从配置中心获取配置,如配置中心宕机时,将会使用 Mirror-File 中的配置数据,提高系统的可用性。
因此接入方可以在高 QPS、高并发场景下使用 XXL-CONF 的客户端, 不必担心并发压力或配置中心宕机导致系统问题。
下图来源 XXL-CONF 官网
3. API 相关
admin 为 WEB 配置项可视化管理提供的相关 REST API 如下(ps:环境管理,项目管理,用户管理非本文核心,不做描述):
API:/conf/pageList
区分环境,项目名分页读取数据库中 xxl_conf_node
表中的所有配置项,以及根据 key 指定查询一个配置项。
API:/conf/delete
物理删除 xxl_conf_node
表中指定的配置项,同时记录变更操作消息存入 xxl_conf_node_msg
表中,等待守护线程定时通过 key 关联的 xxl_conf_node_msg
消息,与磁盘文件 Properties 的配置项数据比对,推送客户端通知告知配置变更。
API:/conf/add
向 xxl_conf_node
表中新增配置项,同时记录变更操作消息存入 xxl_conf_node_msg
表中,等待守护线程定时推送客户端通知告知配置变更。
API:/conf/update
更新 xxl_conf_node
表中指定的配置项,同时记录变更操作消息存入 xxl_conf_node_msg
表中,等待守护线程定时推送客户端通知告知配置变更。
admin 为接入的客户端提供的 REST API 如下:
API:/conf/find
入参:keys,支持批量查询
主要作用是查询配置数据,比较特殊的是,该 API 是直接从上文我们提及过的配置中心的磁盘文件 Properties 中寻找该配置项数据,不会查询数据库。
API:/conf/monitor
入参:keys,支持批量查询
主要作用广播通知接入客户端指定配置项是否发生变更,利用了 SpringMVC DeferredResult 特性将异步操作的结果同步返回给接入客户端,对 DeferredResult 不了解的小伙伴可以参考这篇文章《DeferredResult 扫盲》
该 API 是直接从上文我们提及过的配置中心的磁盘文件 Properties 中寻找该配置项数据,若监测到数据变更则利用 DeferredResult 的 setResult 特性将变更结果返回给接入客户端,告知配置已变更,快来请求我(配置中心)获取最新的配置项数据吧。
针对 API 笔者做了些详细描述,下面让我们来走进 XXL-CONF 源码,一探究竟
4.源码分析-项目启动
4.1 admin 配置中心启动
admin 配置中心正常启动后会启动一个向线程池提交两个线程任务。
@Override
public void afterPropertiesSet() throws Exception {
startThead();
}
public void startThead() throws Exception {
/**
* brocast conf-data msg, sync to file, for "add、update、delete"
* 监控配置是否发生变更,线程会每隔1秒就查询一次xxl_conf_node_msg表,如果该表中有数据,就说明配置有变化,就立即更新本地快照,并广播给所有客户端,每隔30秒清空一次老数据
*/
executorService.execute(() -> {
while (!executorStoped) {
try {
// new message, filter readed
List<XxlConfNodeMsg> messageList = xxlConfNodeMsgDao.findMsg(readedMessageIds);
if (messageList != null && messageList.size() > 0) {
for (XxlConfNodeMsg message : messageList) {
readedMessageIds.add(message.getId());
// sync file
// 配置发生变更,配置中心不会直接将变更的数据推送给客户端,而是告诉客户端数据有变化,需要客户端主动发起http请求调用配置中心的查询接口获取最新的配置
setFileConfData(message.getEnv(), message.getKey(), message.getValue());
}
}
// 清除老的消息
if ((System.currentTimeMillis() / 1000) % confBeatTime == 0) {
xxlConfNodeMsgDao.cleanMessage(confBeatTime);
readedMessageIds.clear();
}
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
try {
// 休眠 1 秒
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
}
});
}
简单说明下,首先利用 Spring 容器启动执行 Bean 的初始化时机,启动两个线程任务。
第一个线程:监控配置是否发生变更,每隔 1 秒就查询一次 xxl_conf_node_msg 表,如果该表中有数据,就说明配置有变化,就立即更新本地快照,并广播给所有客户端,每隔 30 秒清空一次老数据。
配置变化广播实现如下:
private String setFileConfData(String env, String key, String value) {
// 获取文件名称
String confFileName = parseConfDataFileName(env, key);
// 通过文件名加载磁盘文件
Properties existProp = PropUtil.loadFileProp(confFileName);
// 如果存在文件,且证明没有被删除则返回文件路径
if (existProp != null
&& value != null
&& value.equals(existProp.getProperty("value"))
) {
return new File(confFileName).getPath();
}
// write
Properties prop = new Properties();
// 配置项数据为空说明已经被删除,则设置属性 value-deleted=true
if (value == null) {
prop.setProperty("value-deleted", "true");
} else {
// 否则设置设置 value 属性当前的配置项数据
prop.setProperty("value", value);
}
// 写磁盘文件到指定目录下
PropUtil.writeFileProp(prop, confFileName);
logger.info(">>>>>>>>>>> xxl-conf, setFileConfData: confFileName={}, value={}", confFileName, value);
// 从 confDeferredResultMap 中获取客户端请求,响应客户端数据更新
// brocast monitor client
List<DeferredResult> deferredResultList = confDeferredResultMap.get(confFileName);
if (deferredResultList != null) {
confDeferredResultMap.remove(confFileName);
for (DeferredResult deferredResult : deferredResultList) {
deferredResult.setResult(new ReturnT<>(ReturnT.SUCCESS_CODE, "Monitor key update."));
}
}
return new File(confFileName).getPath();
}
上述代码都比较简单,有一处你可以能不太理解如何实现广播通知客户端呢?
其实是通过 Spring MVC 的 DeferredResult 特性,当客户端请求配置中心 /conf/monitor API 时,会将当次请求的 DeferredResult 存储在内存中,key 为文件名称,value 为 DeferredResult 集合,admin 配置中心启动后的线程任务会从 confDeferredResultMap 中获取对应的 DeferredResult,为其设置 result,同步响应客户端是否变更。
对 DeferredResult 不了解的小伙伴可以参考这篇文章《DeferredResult 扫盲》
/**
* 配置中心的广播机制利用了Spring MVC 的 DeferredResult 对象特性实现
*/
private Map<String, List<DeferredResult>> confDeferredResultMap = new ConcurrentHashMap<>();
@Override
public DeferredResult<ReturnT<String>> monitor(String accessToken, String env, List<String> keys) {
// 默认的超时时间是 30 秒,如果配置没有发生变化,则等待超时返回
DeferredResult deferredResult = new DeferredResult(confBeatTime * 1000L, new ReturnT<>(ReturnT.SUCCESS_CODE, "Monitor timeout, no key updated."));
// valid
if (this.accessToken != null && this.accessToken.trim().length() > 0 && !this.accessToken.equals(accessToken)) {
deferredResult.setResult(new ReturnT<>(ReturnT.FAIL.getCode(), "AccessToken Invalid."));
return deferredResult;
}
if (env == null || env.trim().length() == 0) {
deferredResult.setResult(new ReturnT<>(ReturnT.FAIL.getCode(), "env Invalid."));
return deferredResult;
}
if (keys == null || keys.size() == 0) {
deferredResult.setResult(new ReturnT<>(ReturnT.FAIL.getCode(), "keys Invalid."));
return deferredResult;
}
// monitor by client
for (String key : keys) {
// invalid key, pass
if (key == null || key.trim().length() < 4 || key.trim().length() > 100
|| !RegexUtil.matches(RegexUtil.abc_number_line_point_pattern, key)) {
continue;
}
// monitor each key
String fileName = parseConfDataFileName(env, key);
List<DeferredResult> deferredResultList = confDeferredResultMap.get(fileName);
if (deferredResultList == null) {
deferredResultList = new ArrayList<>();
confDeferredResultMap.put(fileName, deferredResultList);
}
deferredResultList.add(deferredResult);
}
return deferredResult;
}
第二个线程任务,从数据库加载全量数据,考虑到数据量可能比较大,线程会间隔 30 秒查询一次xxl_conf_node
表,如果数据有变化,就立即更新本地快照并广播给所有客户端
/**
* sync total conf-data, db + file (1+N/30s)
*
* clean deleted conf-data file
*
* 从数据库加载全量数据,考虑到数据量可能比较大,线程会间隔30秒查询一次xxl_conf_node表.如果数据有变化,就立即更新本地快照并广播给所有客户端
*/
executorService.execute(() -> {
while (!executorStoped) {
// align to beattime
try {
long sleepSecond = confBeatTime - (System.currentTimeMillis() / 1000) % confBeatTime;
if (sleepSecond > 0 && sleepSecond < confBeatTime) {
TimeUnit.SECONDS.sleep(sleepSecond);
}
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
try {
// sync registry-data, db + file
int offset = 0;
int pagesize = 1000;
List<String> confDataFileList = new ArrayList<>();
List<XxlConfNode> confNodeList = xxlConfNodeDao.pageList(offset, pagesize, null, null, null);
while (confNodeList != null && confNodeList.size() > 0) {
for (XxlConfNode confNoteItem : confNodeList) {
// 配置发生变更,配置中心不会直接将变更的数据推送给客户端,而是告诉客户端数据有变化,需要客户端主动发起http请求调用配置中心的查询接口获取最新的配置
String confDataFile = setFileConfData(confNoteItem.getEnv(), confNoteItem.getKey(), confNoteItem.getValue());
// collect confDataFile
confDataFileList.add(confDataFile);
}
offset += 1000;
confNodeList = xxlConfNodeDao.pageList(offset, pagesize, null, null, null);
}
// clean old registry-data file
cleanFileConfData(confDataFileList);
logger.debug(">>>>>>>>>>> xxl-conf, sync totel conf data success, sync conf count = {}", confDataFileList.size());
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
try {
TimeUnit.SECONDS.sleep(confBeatTime);
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
}
});
生成文件名字的方法如下:
private String parseConfDataFileName(String env, String key) {
// fileName
String fileName = confDataFilePath
.concat(File.separator).concat(env)
.concat(File.separator).concat(key)
.concat(".properties");
return fileName;
}
4.2 客户端启动
客户端正常启动,从 properties 中获取系统配置填充到 XxlConfConfig 配置类中。
@Configuration
public class XxlConfConfig {
private Logger logger = LoggerFactory.getLogger(XxlConfConfig.class);
@Value("${xxl.conf.admin.address}")
private String adminAddress;
@Value("${xxl.conf.env}")
private String env;
@Value("${xxl.conf.access.token}")
private String accessToken;
@Value("${xxl.conf.mirrorfile}")
private String mirrorfile;
@Bean
public XxlConfFactory xxlConfFactory() {
XxlConfFactory xxlConf = new XxlConfFactory();
xxlConf.setAdminAddress(adminAddress);
xxlConf.setEnv(env);
xxlConf.setAccessToken(accessToken);
xxlConf.setMirrorfile(mirrorfile);
logger.info(">>>>>>>>>>> xxl-conf config init.");
return xxlConf;
}
}
利用 Spring 容器启动机制首先启动工厂,来看 XxlConfFactory 类初始化方法
XxlConfBaseFactory.init(adminAddress, env, accessToken, mirrorfile);
public class XxlConfBaseFactory {
/**
* init
*
* @param adminAddress
* @param env
*/
public static void init(String adminAddress, String env, String accessToken, String mirrorfile) {
// init remote util
XxlConfRemoteConf.init(adminAddress, env, accessToken);
// init mirror util
XxlConfMirrorConf.init(mirrorfile);
// init cache + thread, cycle refresh + monitor
XxlConfLocalCacheConf.init();
// listener all key change
XxlConfListenerFactory.addListener(null, new BeanRefreshXxlConfListener());
}
/**
* destory
*/
public static void destroy() {
XxlConfLocalCacheConf.destroy(); // destroy
}
}
先来看 XxlConfRemoteConf.init(adminAddress, env, accessToken);
方法
简单来说就是填充字段属性,其中可以看到解析了配置中心 url,可以配置多个,用逗号分隔。
private static String adminAddress;
private static String env;
private static String accessToken;
private static List<String> adminAddressArr = null;
public static void init(String adminAddress, String env, String accessToken) {
// valid
if (adminAddress==null || adminAddress.trim().length()==0) {
throw new XxlConfException("xxl-conf adminAddress can not be empty");
}
if (env==null || env.trim().length()==0) {
throw new XxlConfException("xxl-conf env can not be empty");
}
XxlConfRemoteConf.adminAddress = adminAddress;
XxlConfRemoteConf.env = env;
XxlConfRemoteConf.accessToken = accessToken;
// parse
XxlConfRemoteConf.adminAddressArr = new ArrayList<>();
if (adminAddress.contains(",")) {
XxlConfRemoteConf.adminAddressArr.addAll(Arrays.asList(adminAddress.split(",")));
} else {
XxlConfRemoteConf.adminAddressArr.add(adminAddress);
}
}
接着执行 XxlConfMirrorConf.init(mirrorfile);
方法
设置 mirrorfile 属性,也就是客户端镜像文件(三级存储)的磁盘存放路径。
private static String mirrorfile = null;
public static void init(String mirrorfileParam) {
// valid
if (mirrorfileParam == null || mirrorfileParam.trim().length() == 0) {
throw new XxlConfException("xxl-conf mirrorfileParam can not be empty");
}
mirrorfile = mirrorfileParam;
}
接着执行 XxlConfLocalCacheConf.init();
方法
// 客户端内存级缓存 四级存储
private static ConcurrentHashMap<String, CacheNode> localCacheRepository = null;
private static Thread refreshThread;
private static boolean refreshThreadStop = false;
public static void init() {
// 创建一个本地缓存用于保存配置数据
localCacheRepository = new ConcurrentHashMap<String, CacheNode>();
// preload: mirror or remote
Map<String, String> preConfData = new HashMap<>();
// 读取客户端镜像文件配置
Map<String, String> mirrorConfData = XxlConfMirrorConf.readConfMirror();
Map<String, String> remoteConfData = null;
if (mirrorConfData != null && mirrorConfData.size() > 0) {
// 拉取镜像文件中配置集在远端的真实配置
remoteConfData = XxlConfRemoteConf.find(mirrorConfData.keySet());
}
if (mirrorConfData != null && mirrorConfData.size() > 0) {
preConfData.putAll(mirrorConfData);
}
// 如果远端加载成功,则会覆盖镜像配置
if (remoteConfData != null && remoteConfData.size() > 0) {
preConfData.putAll(remoteConfData);
}
if (preConfData != null && preConfData.size() > 0) {
for (String preKey : preConfData.keySet()) {
// PRELOAD
set(preKey, preConfData.get(preKey), SET_TYPE.PRELOAD);
}
}
// refresh thread
// 启动一个守护线程,每隔3秒钟查看一下本地缓存是否有数据,线程会在此阻塞.
// 客户会通过守护线程与服务端保持长连接,客户端会循环调用配置中心的监控接口,如果配置中心数据有变化,会立刻通知客户端,客户端接收到通知会立刻调用配置中心的查询接口获取数据,如果配置中心的数据没有变更,则默认30秒后再调用查询接口
refreshThread = new Thread(() -> {
while (!refreshThreadStop) {
try {
refreshCacheAndMirror();
} catch (Exception e) {
if (!refreshThreadStop && !(e instanceof InterruptedException)) {
logger.error(">>>>>>>>>> xxl-conf, refresh thread error.");
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>> xxl-conf, refresh thread stoped.");
});
refreshThread.setDaemon(true);
refreshThread.start();
logger.info(">>>>>>>>>> xxl-conf, XxlConfLocalCacheConf init success.");
}
简单说明下上述代码的作用
首先读取客户端镜像文件配置,接着拉取客户端镜像文件中配置集在远端的真实配置:调用配置中心 /conf/find API,首先使用客户端镜像文件中的配置,但如果远端加载成功,则会覆盖镜像配置。
最后遍历每个配置,调用 set(preKey, preConfData.get(preKey), SET_TYPE.PRELOAD);
方法,
可以看到当前 optType 为 SET_TYPE.PRELOAD,所以直接将每个配置项,封装成 CacheNode 节点,存储在客户端内存级 localCacheRepository 缓存中。
private static void set(String key, String value, SET_TYPE optType) {
localCacheRepository.put(key, new CacheNode(value));
logger.info(">>>>>>>>>> xxl-conf: {}: [{}={}]", optType, key, value);
// value updated, invoke listener
if (optType == SET_TYPE.RELOAD) {
XxlConfListenerFactory.onChange(key, value);
}
// new conf, new monitor
if (optType == SET_TYPE.SET) {
refreshThread.interrupt();
}
}
public static class CacheNode implements Serializable {
private static final long serialVersionUID = 42L;
private String value;
public CacheNode() {
}
public CacheNode(String value) {
this.value = value;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
接着会启动一个守护线程,调用 refreshCacheAndMirror();
方法
每隔 3 秒钟查看一下本地缓存是否有数据,线程会在此阻塞
private static void refreshCacheAndMirror() throws InterruptedException {
// 每隔3秒钟查看一下本地缓存是否有数据 没数据就等待3秒
if (localCacheRepository.size() == 0) {
TimeUnit.SECONDS.sleep(3);
return;
}
// 有数据,会向配置中心发送http请求查询自己需要的配置信息 查询是否配置变更
boolean monitorRet = XxlConfRemoteConf.monitor(localCacheRepository.keySet());
// 避免失败重试请求太快
if (!monitorRet) {
TimeUnit.SECONDS.sleep(10);
}
// refresh cache: remote > cache
Set<String> keySet = localCacheRepository.keySet();
if (keySet.size() > 0) {
Map<String, String> remoteDataMap = XxlConfRemoteConf.find(keySet);
// 先比对一下配置中心的配置项和本地缓存中配置项是否相同
if (remoteDataMap != null && remoteDataMap.size() > 0) {
for (String remoteKey : remoteDataMap.keySet()) {
String remoteData = remoteDataMap.get(remoteKey);
CacheNode existNode = localCacheRepository.get(remoteKey);
// 如果相同就直接忽略不处理
if (existNode != null && existNode.getValue() != null && existNode.getValue().equals(remoteData)) {
logger.debug(">>>>>>>>>> xxl-conf: RELOAD unchange-pass [{}].", remoteKey);
}
// 如果本地缓存仓库中没有该key,或该key的值为空,或该key的值有变化
else {
// 更新本地缓存
set(remoteKey, remoteData, SET_TYPE.RELOAD);
}
}
}
}
// refresh mirror: cache > mirror
// 将缓存中的配置同步到镜像文件
Map<String, String> mirrorConfData = new HashMap<>();
for (String key : keySet) {
CacheNode existNode = localCacheRepository.get(key);
mirrorConfData.put(key, existNode.getValue() != null ? existNode.getValue() : "");
}
XxlConfMirrorConf.writeConfMirror(mirrorConfData);
logger.debug(">>>>>>>>>> xxl-conf, refreshCacheAndMirror success.");
}
当客户端本地缓存中有数据的时候,就会被守护线程扫描到,会向配置中心发送 http 请求查询自己需要的配置信息,查询到配置后,它会先比对一下配置中心的配置项和本地缓存中配置项是否相同,如果相同就直接忽略不处理,如果不相同,说明有变化,如果本地缓存仓库中没有该key,或该key的值为空,或该key的值有变化,则更新本地缓存,最后将缓存中的配置同步到镜像文件。
而客户会通过守护线程与服务端保持长连接,客户端会循环调用配置中心的监控接口,如果配置中心数据有变化会立刻通知客户端,客户端接收到通知会立刻调用配置中心的查询接口获取数据,如果配置中心的数据没有变更,则默认 30 秒后再调用查询接口。
这里需要注意的是:如果配置中心增加了新的配置,客户端是不会收到通知的,因为客户端每次请求接口只拉取自己所使用到的配置
接着回到 XxlConfBaseFactory#init()
方法,继续执行 XxlConfListenerFactory.addListener(null, new BeanRefreshXxlConfListener());
方法
注册监听器,支持监听配置变更事件,例如可据此实现动态刷新JDBC连接池等高级功能场景,如下使用方式
XxlConfClient.addListener("default.key01", new XxlConfListener(){
@Override
public void onChange(String key, String value) throws Exception {
logger.info("配置变更事件通知:{}={}", key, value);
}
});
来看下源码实现
public class XxlConfListenerFactory {
private static Logger logger = LoggerFactory.getLogger(XxlConfListenerFactory.class);
/**
* xxl conf listener repository
*/
private static ConcurrentHashMap<String, List<XxlConfListener>> keyListenerRepository = new ConcurrentHashMap<>();
private static List<XxlConfListener> noKeyConfListener = Collections.synchronizedList(new ArrayList<XxlConfListener>());
/**
* add listener and first invoke + watch
*
* @param key empty will listener all key
* @param xxlConfListener
* @return
*/
public static boolean addListener(String key, XxlConfListener xxlConfListener){
if (xxlConfListener == null) {
return false;
}
if (key==null || key.trim().length()==0) {
// listene all key used
noKeyConfListener.add(xxlConfListener);
return true;
} else {
// first use, invoke and watch this key
try {
String value = XxlConfClient.get(key);
xxlConfListener.onChange(key, value);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
// listene this key
List<XxlConfListener> listeners = keyListenerRepository.get(key);
if (listeners == null) {
listeners = new ArrayList<>();
keyListenerRepository.put(key, listeners);
}
listeners.add(xxlConfListener);
return true;
}
}
/**
* invoke listener on xxl conf change
*
* @param key
*/
public static void onChange(String key, String value){
if (key==null || key.trim().length()==0) {
return;
}
List<XxlConfListener> keyListeners = keyListenerRepository.get(key);
if (keyListeners!=null && keyListeners.size()>0) {
for (XxlConfListener listener : keyListeners) {
try {
listener.onChange(key, value);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
if (noKeyConfListener.size() > 0) {
for (XxlConfListener confListener: noKeyConfListener) {
try {
confListener.onChange(key, value);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
}
}
代码比较简单,BeanRefreshXxlConfListener 实现 onChange 方法
BeanRefreshXxlConfListener#onChange()
方法具体实现如下:
@Override
public void onChange(String key, String value) throws Exception {
List<BeanField> beanFieldList = key2BeanField.get(key);
if (beanFieldList!=null && beanFieldList.size()>0) {
for (BeanField beanField: beanFieldList) {
XxlConfFactory.refreshBeanField(beanField, value, null);
}
}
}
key2BeanField 是什么呢?
// key : object-field[]
private static Map<String, List<BeanField>> key2BeanField = new ConcurrentHashMap<String, List<BeanField>>();
public static void addBeanField(String key, BeanField beanField){
List<BeanField> beanFieldList = key2BeanField.get(key);
if (beanFieldList == null) {
beanFieldList = new ArrayList<>();
key2BeanField.put(key, beanFieldList);
}
for (BeanField item: beanFieldList) {
if (item.getBeanName().equals(beanField.getBeanName()) && item.getProperty().equals(beanField.getProperty())) {
return; // avoid repeat refresh
}
}
beanFieldList.add(beanField);
}
它实际上是在 Spring 生命周期,postProcessPropertyValues
时设置,用来支持 Spring XML 占位符的方式动态更新配置,具体实现如下:
@Override
public PropertyValues postProcessPropertyValues(PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeansException {
// 2、XML('$XxlConf{...}'):resolves placeholders + watch
if (!beanName.equals(this.beanName)) {
PropertyValue[] pvArray = pvs.getPropertyValues();
for (PropertyValue pv : pvArray) {
if (pv.getValue() instanceof TypedStringValue) {
String propertyName = pv.getName();
String typeStringVal = ((TypedStringValue) pv.getValue()).getValue();
if (xmlKeyValid(typeStringVal)) {
// object + property
String confKey = xmlKeyParse(typeStringVal);
// 向配置中心发送http请求查询数据,先将查询到的数据放入本地缓存,然后通过反射给这些字段赋值
String confValue = XxlConfClient.get(confKey, "");
// resolves placeholders
BeanRefreshXxlConfListener.BeanField beanField = new BeanRefreshXxlConfListener.BeanField(beanName, propertyName);
//refreshBeanField(beanField, confValue, bean);
Class propClass = String.class;
for (PropertyDescriptor item: pds) {
if (beanField.getProperty().equals(item.getName())) {
propClass = item.getPropertyType();
}
}
Object valueObj = FieldReflectionUtil.parseValue(propClass, confValue);
pv.setConvertedValue(valueObj);
// watch
BeanRefreshXxlConfListener.addBeanField(confKey, beanField);
}
}
}
}
return super.postProcessPropertyValues(pvs, pds, bean, beanName);
}
如果扫描到 XML 配置格式满足 '$XxlConf{...}',通过 key 先从客户端四级存储 localCacheRepository 中获取,获取不到则向配置中心请求 /conf/find API 查询数据,先将查询到的数据放入本地缓存,然后通过反射给这些字段赋值。
XxlConfFactory.refreshBeanField(beanField, value, null);
实现如下
主要是支持 Spring XML 占位符的方式动态更新配置
// XxlConfFactory.java
public static void refreshBeanField(final BeanRefreshXxlConfListener.BeanField beanField, final String value, Object bean){
if (bean == null) {
// 已优化:启动时禁止实用,getBean 会导致Bean提前初始化,风险较大;
bean = XxlConfFactory.beanFactory.getBean(beanField.getBeanName());
}
if (bean == null) {
return;
}
BeanWrapper beanWrapper = new BeanWrapperImpl(bean);
// property descriptor
PropertyDescriptor propertyDescriptor = null;
PropertyDescriptor[] propertyDescriptors = beanWrapper.getPropertyDescriptors();
if (propertyDescriptors!=null && propertyDescriptors.length>0) {
for (PropertyDescriptor item: propertyDescriptors) {
if (beanField.getProperty().equals(item.getName())) {
propertyDescriptor = item;
}
}
}
// refresh field: set or field
if (propertyDescriptor!=null && propertyDescriptor.getWriteMethod() != null) {
beanWrapper.setPropertyValue(beanField.getProperty(), value); // support mult data types
logger.info(">>>>>>>>>>> xxl-conf, refreshBeanField[set] success, {}#{}:{}",
beanField.getBeanName(), beanField.getProperty(), value);
} else {
final Object finalBean = bean;
ReflectionUtils.doWithFields(bean.getClass(), new ReflectionUtils.FieldCallback() {
@Override
public void doWith(Field fieldItem) throws IllegalArgumentException, IllegalAccessException {
if (beanField.getProperty().equals(fieldItem.getName())) {
try {
Object valueObj = FieldReflectionUtil.parseValue(fieldItem.getType(), value);
fieldItem.setAccessible(true);
fieldItem.set(finalBean, valueObj); // support mult data types
logger.info(">>>>>>>>>>> xxl-conf, refreshBeanField[field] success, {}#{}:{}",
beanField.getBeanName(), beanField.getProperty(), value);
} catch (IllegalAccessException e) {
throw new XxlConfException(e);
}
}
}
});
}
}
这里是不是很简单,就是通过反射来填充字段来设置配置项,没什么可说的。
了解 Spring 都知道 postProcessPropertyValues
是否执行,实际上是通过 postProcessAfterInstantiation
控制,当 postProcessAfterInstantiation 返回 true,postProcessPropertyValues 才会被执行,在 Bean 的实例化之后回调。
// XxlConfFactory.java
@Override
public boolean postProcessAfterInstantiation(final Object bean, final String beanName) throws BeansException {
// 1、Annotation('@XxlConf'):resolves conf + watch
if (!beanName.equals(this.beanName)) {
ReflectionUtils.doWithFields(bean.getClass(), new ReflectionUtils.FieldCallback() {
@Override
public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
// 解析所有加了@XxlConf注解的字段或使用$XxlConf{}占位符配置,拿到key,
if (field.isAnnotationPresent(XxlConf.class)) {
String propertyName = field.getName();
XxlConf xxlConf = field.getAnnotation(XxlConf.class);
String confKey = xxlConf.value();
// 向配置中心发送http请求查询数据,先将查询到的数据放入本地缓存,然后通过反射给这些字段赋值
String confValue = XxlConfClient.get(confKey, xxlConf.defaultValue());
// resolves placeholders
BeanRefreshXxlConfListener.BeanField beanField = new BeanRefreshXxlConfListener.BeanField(beanName, propertyName);
refreshBeanField(beanField, confValue, bean);
// watch
if (xxlConf.callback()) {
BeanRefreshXxlConfListener.addBeanField(confKey, beanField);
}
}
}
});
}
return super.postProcessAfterInstantiation(bean, beanName);
}
通过这个方法支持了通过 @XxlConf 注解的方式获取配置,动态刷新配置。
5. 总结
至此本文完。