soul数据同步(二)zookeeper同步策略
基于 zookeeper 的同步原理很简单,主要是依赖 的 watch 机制, 会监听配置的节点, 在启动的时候,会将数据全量写入 ,后续数据发生变更时,会增量更新 的节点,与此同时, 会监听配置信息的节点,一旦有信息变更时,会更新本地缓存
1. 使用zk进行数据同步
1.1 soul-admin
修改application.yml,配置zk同步相关属性
soul:
sync:
zookeeper:
url: localhost:2181
sessionTimeout: 5000
connectionTimeout: 2000
1.2 soul-bootstrap
修改pom.xml,引入zk依赖
org.dromara
soul-spring-boot-starter-sync-data-zookeeper
${project.version}
修改application-local.yml,配置zk同步相关属性
soul :
sync:
zookeeper:
url: localhost:2181
sessionTimeout: 5000
connectionTimeout: 2000
2. 源码分析
2.1 soul-admin的启动分析
读取application.yml中,zk同步相关配置属性,创建ZkClient 实例
// org.dromara.soul.admin.config.ZookeeperConfiguration
@EnableConfigurationProperties(ZookeeperProperties.class)
public class ZookeeperConfiguration {
/**
* register zkClient in spring ioc.
*
* @param zookeeperProp the zookeeper configuration
* @return ZkClient {@linkplain ZkClient}
*/
@Bean
@ConditionalOnMissingBean(ZkClient.class)
public ZkClient zkClient(final ZookeeperProperties zookeeperProp) {
return new ZkClient(zookeeperProp.getUrl(), zookeeperProp.getSessionTimeout(), zookeeperProp.getConnectionTimeout());
}
}
// org.dromara.soul.admin.config.ZookeeperProperties
@Data
@ConfigurationProperties(prefix = "soul.sync.zookeeper")
public class ZookeeperProperties {
private String url;
private Integer sessionTimeout;
private Integer connectionTimeout;
private String serializer;
}
创建和实例
// org.dromara.soul.admin.config.DataSyncConfiguration
@Configuration
public class DataSyncConfiguration {
.......
/**
* The type Zookeeper listener.
*/
@Configuration
@ConditionalOnProperty(prefix = "soul.sync.zookeeper", name = "url")
@Import(ZookeeperConfiguration.class)
static class ZookeeperListener {
/**
* Config event listener data changed listener.
*
* @param zkClient the zk client
* @return the data changed listener
*/
@Bean
@ConditionalOnMissingBean(ZookeeperDataChangedListener.class)
public DataChangedListener zookeeperDataChangedListener(final ZkClient zkClient) {
return new ZookeeperDataChangedListener(zkClient);
}
/**
* Zookeeper data init zookeeper data init.
*
* @param zkClient the zk client
* @param syncDataService the sync data service
* @return the zookeeper data init
*/
@Bean
@ConditionalOnMissingBean(ZookeeperDataInit.class)
public ZookeeperDataInit zookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {
return new ZookeeperDataInit(zkClient, syncDataService);
}
}
.......
}
实现了接口,该接口定义了一系列数据变更回调方法,实现了这些方法,当数据变更时,会通过内部的写入zk
// org.dromara.soul.admin.listener.DataChangedListener
public interface DataChangedListener {
default void onAppAuthChanged(List changed, DataEventTypeEnum eventType) {
}
default void onPluginChanged(List changed, DataEventTypeEnum eventType) {
}
default void onSelectorChanged(List changed, DataEventTypeEnum eventType) {
}
default void onMetaDataChanged(List changed, DataEventTypeEnum eventType) {
}
default void onRuleChanged(List changed, DataEventTypeEnum eventType) {
}
}
实现了接口,应用初始化后会执行其 run()方法,当zk中没有soul相关配置时,会调用syncDataService.syncAll()
// org.dromara.soul.admin.listener.zookeeper.ZookeeperDataInit
public class ZookeeperDataInit implements CommandLineRunner {
private final ZkClient zkClient;
private final SyncDataService syncDataService;
/**
* Instantiates a new Zookeeper data init.
*
* @param zkClient the zk client
* @param syncDataService the sync data service
*/
public ZookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {
this.zkClient = zkClient;
this.syncDataService = syncDataService;
}
@Override
public void run(final String... args) {
String pluginPath = ZkPathConstants.PLUGIN_PARENT;
String authPath = ZkPathConstants.APP_AUTH_PARENT;
String metaDataPath = ZkPathConstants.META_DATA;
if (!zkClient.exists(pluginPath) && !zkClient.exists(authPath) && !zkClient.exists(metaDataPath)) {
syncDataService.syncAll(DataEventTypeEnum.REFRESH);
}
}
}
syncDataService.syncAll(),从数据库中读取plugin、selector、rule配置,通过eventPublisher,发布出去
// org.dromara.soul.admin.service.sync.SyncDataServiceImpl
@Service("syncDataService")
public class SyncDataServiceImpl implements SyncDataService {
private final AppAuthService appAuthService;
private final MetaDataService metaDataService;
private final PluginService pluginService;
private final SelectorService selectorService;
private final RuleService ruleService;
private final ApplicationEventPublisher eventPublisher;
@Override
public boolean syncAll(final DataEventTypeEnum type) {
appAuthService.syncData();
List pluginDataList = pluginService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));
List selectorDataList = selectorService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));
List ruleDataList = ruleService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));
metaDataService.syncData();
return true;
}
@Override
public boolean syncPluginData(final String pluginId) {
.....
}
}
DataChangedEventDispatcher接收到eventPublisher,调用DataChangedListener的回调方法,这里的listener是前面创建的实例,其会调用方法会同步数据到zk
// org.dromara.soul.admin.listener.DataChangedEventDispatcher
@Component
public class DataChangedEventDispatcher implements ApplicationListener, InitializingBean {
private ApplicationContext applicationContext;
private List listeners;
public DataChangedEventDispatcher(final ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
for (DataChangedListener listener : listeners) {
switch (event.getGroupKey()) {
case APP_AUTH:
listener.onAppAuthChanged((List) event.getSource(), event.getEventType());
break;
case PLUGIN:
listener.onPluginChanged((List) event.getSource(), event.getEventType());
break;
case RULE:
listener.onRuleChanged((List) event.getSource(), event.getEventType());
break;
case SELECTOR:
listener.onSelectorChanged((List) event.getSource(), event.getEventType());
break;
case META_DATA:
listener.onMetaDataChanged((List) event.getSource(), event.getEventType());
break;
default:
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}
@Override
public void afterPropertiesSet() {
Collection listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();
this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));
}
}
2.2 soul-bootstrap的启动分析
创建org.dromara.soul.sync.data.zookeeper.ZookeeperSyncDataService和ZkClient实例
// org.dromara.soul.spring.boot.sync.data.zookeeper.ZookeeperSyncDataConfiguration
@Configuration
@ConditionalOnClass(ZookeeperSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.zookeeper", name = "url")
@EnableConfigurationProperties(ZookeeperConfig.class)
@Slf4j
public class ZookeeperSyncDataConfiguration {
@Bean
public SyncDataService syncDataService(final ObjectProvider zkClient, final ObjectProvider pluginSubscriber,
final ObjectProvider> metaSubscribers, final ObjectProvider> authSubscribers) {
log.info("you use zookeeper sync soul data.......");
return new ZookeeperSyncDataService(zkClient.getIfAvailable(), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
@Bean
public ZkClient zkClient(final ZookeeperConfig zookeeperConfig) {
return new ZkClient(zookeeperConfig.getUrl(), zookeeperConfig.getSessionTimeout(), zookeeperConfig.getConnectionTimeout());
}
}
在ZookeeperSyncDataService构造器里,执行watcher*方法,将zk数据同步到本地缓存,并对监听zk节点
// org.dromara.soul.sync.data.zookeeper.ZookeeperSyncDataService
public class ZookeeperSyncDataService implements SyncDataService, AutoCloseable {
private final ZkClient zkClient;
public ZookeeperSyncDataService(final ZkClient zkClient, final PluginDataSubscriber pluginDataSubscriber,
final List metaDataSubscribers, final List authDataSubscribers) {
this.zkClient = zkClient;
...
watcherData();
watchAppAuth();
watchMetaData();
}
private void watcherData() {
final String pluginParent = ZkPathConstants.PLUGIN_PARENT;
List pluginZKs = zkClientGetChildren(pluginParent);
for (String pluginName : pluginZKs) {
watcherAll(pluginName);
}
zkClient.subscribeChildChanges(pluginParent, (parentPath, currentChildren) -> {
if (CollectionUtils.isNotEmpty(currentChildren)) {
for (String pluginName : currentChildren) {
watcherAll(pluginName);
}
}
});
}
private void watcherAll(final String pluginName) {
watcherPlugin(pluginName);
watcherSelector(pluginName);
watcherRule(pluginName);
}
private void watcherPlugin(final String pluginName) {
String pluginPath = ZkPathConstants.buildPluginPath(pluginName);
if (!zkClient.exists(pluginPath)) {
zkClient.createPersistent(pluginPath, true);
}
cachePluginData(zkClient.readData(pluginPath));
subscribePluginDataChanges(pluginPath, pluginName);
}
private void watcherSelector(final String pluginName) {
...
}
private void watcherRule(final String pluginName) {
...
}
private void subscribePluginDataChanges(final String pluginPath, final String pluginName) {
zkClient.subscribeDataChanges(pluginPath, new IZkDataListener() {
@Override
public void handleDataChange(final String dataPath, final Object data) {
Optional.ofNullable(data)
.ifPresent(d -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSubscribe((PluginData) d)));
}
@Override
public void handleDataDeleted(final String dataPath) {
final PluginData data = new PluginData();
data.setName(pluginName);
Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.unSubscribe(data));
}
});
}
.....
}
2.3 soul-admin 配置变更后源码分析
与上一篇文章分析的流程一样,不一样的是,最终处理的listener是soul-admin启动时创建的,其中的回调方法会将修改后的配置同