Bootstrap

soul数据同步(二)zookeeper同步策略

基于 zookeeper 的同步原理很简单,主要是依赖  的 watch 机制, 会监听配置的节点, 在启动的时候,会将数据全量写入 ,后续数据发生变更时,会增量更新  的节点,与此同时, 会监听配置信息的节点,一旦有信息变更时,会更新本地缓存

1. 使用zk进行数据同步

1.1 soul-admin

修改application.yml,配置zk同步相关属性

soul:
  sync:
    zookeeper:
      url: localhost:2181      
      sessionTimeout: 5000
      connectionTimeout: 2000

1.2 soul-bootstrap

修改pom.xml,引入zk依赖


         org.dromara
         soul-spring-boot-starter-sync-data-zookeeper
         ${project.version}

修改application-local.yml,配置zk同步相关属性

soul :
    sync:
        zookeeper:
             url: localhost:2181
             sessionTimeout: 5000
             connectionTimeout: 2000

2. 源码分析

2.1 soul-admin的启动分析

读取application.yml中,zk同步相关配置属性,创建ZkClient 实例

 // org.dromara.soul.admin.config.ZookeeperConfiguration
@EnableConfigurationProperties(ZookeeperProperties.class)
public class ZookeeperConfiguration {  

    /**
     * register zkClient in spring ioc.
     *
     * @param zookeeperProp the zookeeper configuration
     * @return ZkClient {@linkplain ZkClient}
     */
    @Bean
    @ConditionalOnMissingBean(ZkClient.class)
    public ZkClient zkClient(final ZookeeperProperties zookeeperProp) {
        return new ZkClient(zookeeperProp.getUrl(), zookeeperProp.getSessionTimeout(), zookeeperProp.getConnectionTimeout());
    }
}

// org.dromara.soul.admin.config.ZookeeperProperties
@Data
@ConfigurationProperties(prefix = "soul.sync.zookeeper")
public class ZookeeperProperties {     

    private String url;

    private Integer sessionTimeout;

    private Integer connectionTimeout;

    private String serializer;
}

创建和实例

// org.dromara.soul.admin.config.DataSyncConfiguration
@Configuration
public class DataSyncConfiguration {
    .......
    /**
     * The type Zookeeper listener.
     */
    @Configuration
    @ConditionalOnProperty(prefix = "soul.sync.zookeeper", name = "url")
    @Import(ZookeeperConfiguration.class)
    static class ZookeeperListener {

        /**
         * Config event listener data changed listener.
         *
         * @param zkClient the zk client
         * @return the data changed listener
         */
        @Bean
        @ConditionalOnMissingBean(ZookeeperDataChangedListener.class)
        public DataChangedListener zookeeperDataChangedListener(final ZkClient zkClient) {
            return new ZookeeperDataChangedListener(zkClient);
        }

        /**
         * Zookeeper data init zookeeper data init.
         *
         * @param zkClient        the zk client
         * @param syncDataService the sync data service
         * @return the zookeeper data init
         */
        @Bean
        @ConditionalOnMissingBean(ZookeeperDataInit.class)
        public ZookeeperDataInit zookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {
            return new ZookeeperDataInit(zkClient, syncDataService);
        }
    }

  .......
}

实现了接口,该接口定义了一系列数据变更回调方法,实现了这些方法,当数据变更时,会通过内部的写入zk

// org.dromara.soul.admin.listener.DataChangedListener
public interface DataChangedListener {

    default void onAppAuthChanged(List changed, DataEventTypeEnum eventType) {
    }
    default void onPluginChanged(List changed, DataEventTypeEnum eventType) {
    }

    default void onSelectorChanged(List changed, DataEventTypeEnum eventType) {
    }
    default void onMetaDataChanged(List changed, DataEventTypeEnum eventType) {

    }
    default void onRuleChanged(List changed, DataEventTypeEnum eventType) {
    }
}

实现了接口,应用初始化后会执行其 run()方法,当zk中没有soul相关配置时,会调用syncDataService.syncAll()

// org.dromara.soul.admin.listener.zookeeper.ZookeeperDataInit
public class ZookeeperDataInit implements CommandLineRunner {

    private final ZkClient zkClient;

    private final SyncDataService syncDataService;

    /**
     * Instantiates a new Zookeeper data init.
     *
     * @param zkClient        the zk client
     * @param syncDataService the sync data service
     */
    public ZookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {
        this.zkClient = zkClient;
        this.syncDataService = syncDataService;
    }

    @Override
    public void run(final String... args) {
        String pluginPath = ZkPathConstants.PLUGIN_PARENT;
        String authPath = ZkPathConstants.APP_AUTH_PARENT;
        String metaDataPath = ZkPathConstants.META_DATA;
        if (!zkClient.exists(pluginPath) && !zkClient.exists(authPath) && !zkClient.exists(metaDataPath)) {
            syncDataService.syncAll(DataEventTypeEnum.REFRESH);
        }
    }
}

syncDataService.syncAll(),从数据库中读取plugin、selector、rule配置,通过eventPublisher,发布出去

// org.dromara.soul.admin.service.sync.SyncDataServiceImpl
@Service("syncDataService")
public class SyncDataServiceImpl implements SyncDataService {

    private final AppAuthService appAuthService;
    private final MetaDataService metaDataService;
    private final PluginService pluginService;
    private final SelectorService selectorService;
    private final RuleService ruleService;
    private final ApplicationEventPublisher eventPublisher;


    @Override
    public boolean syncAll(final DataEventTypeEnum type) {
        appAuthService.syncData();
        List pluginDataList = pluginService.listAll();
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));
        List selectorDataList = selectorService.listAll();
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));
        List ruleDataList = ruleService.listAll();
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));
        metaDataService.syncData();
        return true;
    }

    @Override
    public boolean syncPluginData(final String pluginId) {
       .....
    }
}

DataChangedEventDispatcher接收到eventPublisher,调用DataChangedListener的回调方法,这里的listener是前面创建的实例,其会调用方法会同步数据到zk

// org.dromara.soul.admin.listener.DataChangedEventDispatcher
@Component
public class DataChangedEventDispatcher implements ApplicationListener, InitializingBean {

    private ApplicationContext applicationContext;

    private List listeners;

    public DataChangedEventDispatcher(final ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void onApplicationEvent(final DataChangedEvent event) {
        for (DataChangedListener listener : listeners) {
            switch (event.getGroupKey()) {
                case APP_AUTH:
                    listener.onAppAuthChanged((List) event.getSource(), event.getEventType());
                    break;
                case PLUGIN:
                    listener.onPluginChanged((List) event.getSource(), event.getEventType());
                    break;
                case RULE:
                    listener.onRuleChanged((List) event.getSource(), event.getEventType());
                    break;
                case SELECTOR:
                    listener.onSelectorChanged((List) event.getSource(), event.getEventType());
                    break;
                case META_DATA:
                    listener.onMetaDataChanged((List) event.getSource(), event.getEventType());
                    break;
                default:
                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
            }
        }
    }

    @Override
    public void afterPropertiesSet() {
        Collection listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();
        this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));
    }

}

2.2 soul-bootstrap的启动分析

创建org.dromara.soul.sync.data.zookeeper.ZookeeperSyncDataService和ZkClient实例

// org.dromara.soul.spring.boot.sync.data.zookeeper.ZookeeperSyncDataConfiguration
@Configuration
@ConditionalOnClass(ZookeeperSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.zookeeper", name = "url")
@EnableConfigurationProperties(ZookeeperConfig.class)
@Slf4j
public class ZookeeperSyncDataConfiguration {

    @Bean
    public SyncDataService syncDataService(final ObjectProvider zkClient, final ObjectProvider pluginSubscriber,
                                           final ObjectProvider> metaSubscribers, final ObjectProvider> authSubscribers) {
        log.info("you use zookeeper sync soul data.......");
        return new ZookeeperSyncDataService(zkClient.getIfAvailable(), pluginSubscriber.getIfAvailable(),
                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
    }

 
    @Bean
    public ZkClient zkClient(final ZookeeperConfig zookeeperConfig) {
        return new ZkClient(zookeeperConfig.getUrl(), zookeeperConfig.getSessionTimeout(), zookeeperConfig.getConnectionTimeout());
    }

}

在ZookeeperSyncDataService构造器里,执行watcher*方法,将zk数据同步到本地缓存,并对监听zk节点

// org.dromara.soul.sync.data.zookeeper.ZookeeperSyncDataService
public class ZookeeperSyncDataService implements SyncDataService, AutoCloseable {

    private final ZkClient zkClient;
		
    public ZookeeperSyncDataService(final ZkClient zkClient, final PluginDataSubscriber pluginDataSubscriber,
                                    final List metaDataSubscribers, final List authDataSubscribers) {
        this.zkClient = zkClient;
        ...
        watcherData();
        watchAppAuth();
        watchMetaData();
    }
  
    private void watcherData() {
        final String pluginParent = ZkPathConstants.PLUGIN_PARENT;
        List pluginZKs = zkClientGetChildren(pluginParent);
        for (String pluginName : pluginZKs) {
            watcherAll(pluginName);
        }
        zkClient.subscribeChildChanges(pluginParent, (parentPath, currentChildren) -> {
            if (CollectionUtils.isNotEmpty(currentChildren)) {
                for (String pluginName : currentChildren) {
                    watcherAll(pluginName);
                }
            }
        });
    }

    private void watcherAll(final String pluginName) {
        watcherPlugin(pluginName);
        watcherSelector(pluginName);
        watcherRule(pluginName);
    }

    private void watcherPlugin(final String pluginName) {
        String pluginPath = ZkPathConstants.buildPluginPath(pluginName);
        if (!zkClient.exists(pluginPath)) {
            zkClient.createPersistent(pluginPath, true);
        }
        cachePluginData(zkClient.readData(pluginPath));
        subscribePluginDataChanges(pluginPath, pluginName);
    }

    private void watcherSelector(final String pluginName) {
       ...
    }

    private void watcherRule(final String pluginName) {
        ...
    }


    private void subscribePluginDataChanges(final String pluginPath, final String pluginName) {
        zkClient.subscribeDataChanges(pluginPath, new IZkDataListener() {

            @Override
            public void handleDataChange(final String dataPath, final Object data) {
                Optional.ofNullable(data)
                        .ifPresent(d -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSubscribe((PluginData) d)));
            }

            @Override
            public void handleDataDeleted(final String dataPath) {
                final PluginData data = new PluginData();
                data.setName(pluginName);
                Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.unSubscribe(data));
            }
        });
    }

    .....
}

2.3 soul-admin 配置变更后源码分析

与上一篇文章分析的流程一样,不一样的是,最终处理的listener是soul-admin启动时创建的,其中的回调方法会将修改后的配置同