Bootstrap

微服务中台技术解析之项目环境隔离

背景

在项目迭代过程中,我们经常遇到需要定制调用链路的情况,比如,参与下单优化项目的应用中,下单请求希望调用到参与该项目的支付应用接口,而不是基线接口。在这种情况下,通常是用项目环境隔离的方式解决。

在实践中,我们采用了(或者环境标签)的概念来解决。参与同一个项目的各个应用都使用同一个项目标签,而调用链路会根据项目标签进行路由和转发,保证带有项目标签的请求都落在有标签的机器上。

下面我们会依次介绍几个项目环境隔离的典型案例和具体实现原理,包括dubbo项目环境隔离、网关项目环境隔离、kafka项目环境隔离,希望对大家有所帮助。

dubbo项目环境隔离

调用链路图

dubbo调用的项目环境隔离是指在发起调用时,优先选择具有相同项目标签的服务提供方,如下图所示:

在上图中,绿色线条链路表示consumer收到一个带有tag1的请求,在处理这个请求时,需要发起rpc调用,那么它会优先选择tag1 provider,如果找不到,再选择基线提供者。

蓝色线条链路表示consumer收到一个不带项目标签的请求,但是consumer本身带有项目标签tag2(附加在启动参数上),那么在处理该请求需要发起rpc调用时,它会优先选择tag2 provider,如果找不到,再选择基线提供者。

而黑色线条链路表示,不带项目标签的consumer收到了不带项目标签的请求,那么它将选择基线提供者进行rpc调用。

实现原理

dubbo项目环境隔离是基于dubbo提供的标签路由功能()实现的,dubbo标签路由基本思路是从候选提供者列表中选择跟 (或者 ) 相同tag的提供者,如果没有相同tag的提供者,再依据是否强制使用tag决定是否使用基线提供者作为兜底。代码逻辑如下:

public  List> route(List> invokers, URL url, Invocation invocation) throws RpcException {
        // filter
        List> result = new ArrayList>();

        // Dynamic param
        String tag = RpcContext.getContext().getAttachment(Constants.TAG_KEY);

        if (StringUtils.isEmpty(tag)){
            String configTag = ConfigUtils.getProperties().getProperty("dubbo.provider.tag");
            if (!StringUtils.isEmpty(configTag)){
                RpcContext.getContext().setAttachment(Constants.TAG_KEY, configTag);
                tag = configTag;
            }
        }

        // Tag request
        if (!StringUtils.isEmpty(tag)) {
            // Select tag invokers first
            for (Invoker invoker : invokers) {
                // if clusterInvoker is used. we don't want to use tag to filter cluster.
                if (invoker.getUrl().getProtocol().equals("zookeeper") || tag.equals(invoker.getUrl().getParameter(Constants.TAG_KEY))) {
                    result.add(invoker);
                }
            }
        }
        // If Constants.REQUEST_TAG_KEY unspecified or no invoker be selected, downgrade to normal invokers
        if (result.isEmpty()) {
            // Only forceTag = true force match, otherwise downgrade
            String forceTag = RpcContext.getContext().getAttachment(Constants.FORCE_USE_TAG);
            if (StringUtils.isEmpty(forceTag) || "false".equals(forceTag)) {
                for (Invoker invoker : invokers) {
                    // if clusterInvoker is used. we don't want to use tag to filter cluster.
                    if (invoker.getUrl().getProtocol().equals("zookeeper") || StringUtils.isEmpty(invoker.getUrl().getParameter(Constants.TAG_KEY))) {
                        result.add(invoker);
                    }
                }
            }
        }
        return result;
    }

在具体实现中,为了支持从上游请求中获取项目标签以及从consumer机器自身获取项目标签,我们扩展实现了dubbo的RouterFactory SPI,并且优先级在TagRouter之前:

public class TagFixRouter extends AbstractRouter {

    /**
     * make sure this router is ahead of default tag router, see com.alibaba.dubbo.rpc.cluster.router.tag.TagRouter
     */
    private static final int DEFAULT_PRIORITY = 90;

    /**
     * 是否强制dubbo tag过滤生效,若集群中不存在与请求tag对应的服务,默认将降级请求tag为空的provider.
     */
    private static final String DUBBO_FORCE_TAG_ENABLE = "dubbo.force.tag.enable";

    public TagFixRouter(URL url) {
        this.priority = url.getParameter(Constants.PRIORITY_KEY, DEFAULT_PRIORITY);
        // 每次请求的时候均需要调用。
        this.url = url.addParameter(Constants.RUNTIME_KEY, "true");
    }

    @Override
    public  List> route(List> invokers, URL url, Invocation invocation) throws RpcException {
      // 这里会优先获取请求中携带的项目标签,其次才是机器自身配置的项目标签  
      String tag = ProjectEnvUtil.calcCurrentProjectEnv();
        if (!StringUtils.isEmpty(tag)){
            RpcContext.getContext().setAttachment(Constants.TAG_KEY, tag);
            String dubboForceTag = System.getProperty(DUBBO_FORCE_TAG_ENABLE,"false");
            if(Boolean.parseBoolean(dubboForceTag)){
                /*在没有匹配标签的provider情况下,默认的规则会选择标签为空的provider,在某些情况下(项目环境开发联调等)会导致bug无法及时发现,需要强制标签匹配生效*/
                RpcContext.getContext().setAttachment(Constants.FORCE_USE_TAG, "true");
            }
        }
        return invokers;
    }

}


	public static String calcCurrentProjectEnv() {
    		// requestTag 在Filter中设置
        String requestTag = ProjectEnvUtil.getRequestProjectEnv();
    		// configTag 在应用启动时候设置
        String configTag = ProjectEnvUtil.getProjectEnv();

        String tag = "";
        if (requestTag != null && !"".equals(requestTag)){
            tag = requestTag;
        }else if (configTag != null && !"".equals(configTag)){
            tag = configTag;
        }
        return tag;
    }

到这里,我们似乎解决了dubbo项目环境隔离的问题,但还有一种情况需要考虑,那就是dubbo provider本身也作为consumer调用其他服务,那么在provider收到带项目标签的请求时,如何正确地路由到下游相同标签的provider,让这个链路一次进行下去呢?

为了解决这个问题,我们扩展实现了dubbo Filter SPI,在provider侧进行拦截,将上游的tag暂存下来,处理完成本次请求后再销毁。代码逻辑如下:

@Slf4j
@Activate(group = {Constants.PROVIDER_SIDE})
public class DubboTagContextFilter implements Filter {

    @Override
    public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
        try {
          	// 获取上游的项目标签
            String tagFromCaller = RpcContext.getContext().getAttachment(Constants.TAG_KEY);
            if(log.isDebugEnabled()){
                log.debug("[club-boot dubbo tag filter] tagFromCaller = {}, invoker = {}, invocation = {}",tagFromCaller,invoker,invocation);
            }
            if (tagFromCaller != null && !"".equals(tagFromCaller)) {
              	// 设置在ThreadLocal变量中
                ProjectEnvUtil.setRequestProjectEnv(tagFromCaller);
            }

            return invoker.invoke(invocation);

        }finally {
            ProjectEnvUtil.clearRequestProjectEnv();
        }
    }

}

注意,这里我们使用了线程局部变量ThreadLocal来暂存本次请求相关的信息(项目标签是其中一项),所以不支持异步线程池的项目环境请求(即只支持同步调用)。

网关项目环境隔离

关于网关的实践可以参看此前的文章(),网关项目环境隔离是指网关可以根据请求所带项目标签自动路由到相应服务,如下图所示:

实现原理

有了上节dubbo项目环境隔离的支持,网关侧实现项目环境调用链路则简单很多。基本思路是,根据http请求中的header或者cookie携带的项目标签(统一约定,e.g. ),在发起dubbo泛化调用前设置到attachment中即可。

	public static final List PROJECT_TAG_ROUTE = Lists.newArrayList("project-env", "project.env", "project_env");	

	public void setDubboRouteTag(GWRequest request){

        String env = getProjectEnv(request);

        if (!"".equals(env)){
            RpcContext.getContext().setAttachment(Constants.TAG_KEY, env);
        }

    }

	public String getProjectEnv(GWRequest request){
        for (String tag: PROJECT_TAG_ROUTE){
            Collection tags = request.getHeaders().get(tag);
            if (tags != null && tags.size() > 0 ){
                return String.join( ",", tags);
            }
        }

        for (String tag: PROJECT_TAG_ROUTE) {
            Collection tags = request.getCookies().get(tag);
            if (tags != null && tags.size() > 0) {
                return String.join(",", tags);
            }
        }
        return "";
    }

kafka项目环境隔离

背景

公司内部我们使用kafka作为消息中间件,在进行项目开发测试时,常会遇到这样的诉求:项目环境的机器只接收带有项目标签的消息,基线机器可以消费不带项目标签的消息,在项目环境机器不存在时,为了保证测试链路正常,基线机器也可以消费带项目标签的消息。

总体架构图

上图展示了三种调用链路:

case 1 : 基础生产者发送消息,存在基础消费者进行消费

case 2 : 带tag1的生产者发送消息,存在tag1的消费者进行消费

case 3 : 带tag2的生产者发送消息,不存在对应tag2的消费者,故由基础消费者进行消费

实现原理

要实现kafka消息隔离,重点要解决几个问题:

  • 如何识别带项目标签的消费者

  • 如何保证消息投递到项目环境机器

  • 消息如何带上项目标签以及如何验证项目标签

先看识别项目标签消费者的问题。

看过前面两节可以知道,我们可以从上游请求中或者应用启动时候的配置参数中获取项目标签,那么在kafka消费者初始化成功后就可以上传topic与消费者tag的映射关系,如上面示意图中的绿色方框所示,代码实现如下:

public class ClubBootKafkaListener {

    @EventListener
    public void registerKafkaConsumerTagListener(ConsumerStartedEvent event){
        GenericMessageListenerContainer listenerContainer = event.getSource(GenericMessageListenerContainer.class);
        ContainerProperties containerProperties = listenerContainer.getContainerProperties();
        String[] consumedTopics = containerProperties.getTopics();
        if(consumedTopics == null){
            return;
        }
        for(String topic : consumedTopics){
            KafkaConsumerRegistry.getInstance().registerTagConsumer(topic, KafkaConfigUtil.getProjectEnv());
        }
    }

}

接着看如何保证消息投递到项目环境机器。

我们知道kafka以group.id作为消费组唯一标示,每个消息只会往group.id投递一次,所以项目环境的group.id要跟基线环境区分开。为此,我们将应用配置的group.id加上项目标签作为项目环境下的group.id ( e.g. mykafkaGroup@tag1 ),保证消息始终投递一份到项目环境。在实践中这个group.id的修改是自动进行的(由统一接入框架中的kafka自动配置完成,在消费初始化前进行),不需要业务方修改配置。

最后看如何让消息带上标签并且验证标签。

我们知道kafka提供了拦截机制,在消息发送前()和消费前进行拦截(),那么我们只需要在发送前拦截附加项目标签,在消费前拦截验证项目标签即可。代码如下:

public class TagProducerInterceptor implements ProducerInterceptor {


    /**
     * 项目标签不为空,则添加 header(tag=xxx)
     *
     * @param record
     * @return
     */
    @Override
    public ProducerRecord onSend(ProducerRecord record) {
    		String tag = ProjectEnvUtil.calcCurrentProjectEnv();
        if(log.isDebugEnabled()){
           log.debug("生产端,项目环境tag="+tag);
        }
        if (!StringUtils.isEmpty(tag)) {
           Headers headers = record.headers();
           // 消息头附加项目标签
           headers.add(KafkaEnvConstant.KAFKA_INTERCEPT_TAG_NAME, tag.getBytes(StandardCharsets.UTF_8));
        }
        return record;
    }
}


public class TagConsumerInterceptor implements ConsumerInterceptor {

    /**
     * 是否需要过滤消息:线上环境不需要拦截
     */
    private boolean needIntercept(){
        return KafkaConfigUtil.isTestEnv() && Boolean.parseBoolean(KafkaConfigUtil.getProperty(KafkaConfigUtil.PROJECT_CONSUME, "true"));
    }

    @Override
    public ConsumerRecords onConsume(ConsumerRecords consumerRecords) {
        if (!needIntercept()){
            if(log.isDebugEnabled()){
                log.debug("客户端[" + Thread.currentThread().getName() + "]消费拦截器静默");
            }
            return consumerRecords;
        }

        try {
            String envTag = ProjectEnvUtil.calcCurrentProjectEnv();
            Map>> filterMapRecords = new HashMap<>(8);
            Set topicPartitions = consumerRecords.partitions();
            for (TopicPartition topicPartition : topicPartitions) {
                List> filterRecords = new ArrayList<>();
                for (ConsumerRecord record : consumerRecords.records(topicPartition)) {
                    Headers headers = record.headers();
                    Header header = headers.lastHeader(KafkaEnvConstant.KAFKA_INTERCEPT_TAG_NAME);
                    String msgTag = null;
                    if(header != null){
                        msgTag = new String(header.value(), StandardCharsets.UTF_8);
                    }
                    if(log.isDebugEnabled()){
                        log.debug("消费端,msg标签tag={}, 项目环境tag={}",msgTag,envTag);
                    }
                    //基础环境且消息头不带标  或者 项目环境标签和消息头标签相等
                    if(StringUtils.isEmpty(envTag) && header == null || envTag.equalsIgnoreCase(msgTag)){
                        filterRecords.add(record);
                    } else if(StringUtils.isEmpty(envTag) && header != null){
                    		//基础环境且该msgTag无注册消费者
                        if( !KafkaConsumerRegistry.getInstance().existConsumerByTag(record.topic(),msgTag)){
                            filterRecords.add(record);
                        }
                    }
                }
                if (filterRecords.size() > 0) {
                    filterMapRecords.put(topicPartition, filterRecords);
                }
            }
            if(log.isDebugEnabled()){
                log.debug("[kafka消费端拦截器]kafka consumer tag filter result, original = {}, filtered = {}", Lists.newArrayList(consumerRecords.iterator()),filterMapRecords);
            }
            return new ConsumerRecords<>(filterMapRecords);
        } catch (Exception e) {
            log.error("intercept kafka record encounter exception.",e);
        }
        return consumerRecords;
    }
}

上面源码中,略加复杂的是验证项目标签逻辑,我们具体解释下。首先获取项目环境标签(来自上游请求或者机器启动参数),其次对每个消息,抽取其消息头中的项目标签,然后进行判断:

  • 消息不带标签,且机器为基础环境,允许消费,对应上面的case 1

  • 消息带标签,且机器为项目环境,二者相等,允许消费,对应上面的 case 2

  • 消息带标签,且机器为基础环境,但是此时没有注册的对应标签的消费者,则允许消费,对应上面的case 3

至此,我们将实现kafka消息隔离所涉及的问题解决完毕。

但需要注意的一点是,在消息拦截后,被过滤掉的消息不会提交offset,因此会使得项目标签消费组的lag不一定总是0,e.g. 生产者向某个topic发送了100个消息,其中只有30个是带标签的,那么项目标签消费组就只会消费到30个消息,lag为70。

总结

项目环境隔离是在研发迭代中是一项非常重要的功能,如何将常用组件串联起完整的项目调用链路也是比较有挑战的工作。本文介绍了在项目开发中涉及环境隔离的几个常用点,包括dubbo项目环境隔离,网关项目环境隔离,kafka项目环境隔离,并介绍了以项目标签为核心的的设计方案。

如有疑问,欢迎共同探讨。