Bootstrap

SpringCloud Gateway 动态路由

何为动态路由?

动态路由即:在不进行网关应用重启的情况下,可以通过管理API或者管理UI的方式添加路由,能实时或准实时生效;且在网关应用重启后,动态添加的路由仍然存在。

动态路由的两个基本要求:实时性和持久性。

Gateway工作原理

查看SringCloud Gateway ,Gateway 工作原理如下图:

Clients make requests to Spring Cloud Gateway. If the Gateway Handler Mapping determines that a request matches a route, it is sent to the Gateway Web Handler. This handler runs the request through a filter chain that is specific to the request. The reason the filters are divided by the dotted line is that filters can run logic both before and after the proxy request is sent. All “pre” filter logic is executed. Then the proxy request is made. After the proxy request is made, the “post” filter logic is run.

客户端请求,首先会被处理,用以在 路由表 中查找一个与请求匹配的 路由 然后将请求交由 处理, 维护了一个过滤器链,链式执行这些过滤器,这些过滤器在逻辑上存在两个执行阶段

本文重点探究 路由查找 的过程,并在此基础上,探究 持久化动态路由表 的实现方式。

Gateway源码阅读

通过阅读官方源码,梳理gateway工作机制,并寻找扩展点,以便实现 持久化动态路由表

RoutePredicateHandlerMapping

Gateway中实现路由查找逻辑的 是 类,该类在 中实现自动装配(Gateway的Bean自动装备都是由此类实现) ,源码260-266行如下

@Bean
public RoutePredicateHandlerMapping routePredicateHandlerMapping(
    FilteringWebHandler webHandler, RouteLocator routeLocator,
    GlobalCorsProperties globalCorsProperties, Environment environment) {
  return new RoutePredicateHandlerMapping(webHandler, routeLocator,
      globalCorsProperties, environment);
}

首先可以看到,这里装配是无条件的,没有留出拓展点(我对此用了特殊的方法进行了拓展),重点是两个Bean的注入:

  • :创建过滤器链,加载全局过滤器并转化为网关过滤器,组合二者,并执行过滤器链,本文不展开这部分

  • :有多个实现类,本文重点 。

@Override
protected Mono getHandlerInternal(ServerWebExchange exchange) {
//... 忽略之上代码

  // lookupRoute 用于查找路由
  return lookupRoute(exchange)
      // 将查找到的路由记录到 ServerWebExchange 上下文中,然后,返回 FilteringWebHandler
      .flatMap((Function>) r -> {
        exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
        if (logger.isDebugEnabled()) {
          logger.debug(
              "Mapping [" + getExchangeDesc(exchange) + "] to " + r);
        }
        // 后续会从Attributes获取到路由对象,进而获取路由过滤器,执行过滤器等列操作
        exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
        return Mono.just(webHandler);

//... 忽略后续代码

 protected Mono lookupRoute(ServerWebExchange exchange) {
   // this.routeLocator.getRoutes() 该方法是重点,后续需要继续分析 
   // RouteLocator 的实现类 是如何 getRoutes()
   return this.routeLocator.getRoutes()
       .concatMap(route -> Mono.just(route).filterWhen(r -> {
         exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
         // 根据请求、当前路由的断言,判断当前当前路由断言是否命中,非本文重点,不展开
         return r.getPredicate().apply(exchange);
       })
//... 忽略后续代码

RouteLocator 实现类

  • : 合并组合其他RouterLocator。

  • : 带有缓存和基于事件刷新缓存机制的RouterLocator。

  • : 实现了将 转化为 的 RouterLocator,具体转化过程本文不展开,该类具有重要的桥梁作用。

// 转化方法 
@Override
public Flux getRoutes() {
  // 注意这里的 getRouteDefinitions() 
	Flux routes = this.routeDefinitionLocator.getRouteDefinitions()
			.map(this::convertToRoute);
  // ... 下略

我们再次看下 中自动装配的情况,源码223-240行

@Bean
public RouteLocator routeDefinitionRouteLocator(GatewayProperties properties,
  // 注入所有过滤器工厂(不含全局过滤器),用于根据RouteDefinition组装Route
  List gatewayFilters,
  // 注入所有断言工厂,用于根据RouteDefinition组装Route
  List predicates,
  // 注入RouteDefinition的加载器,这里是下文重点
  RouteDefinitionLocator routeDefinitionLocator,
  ConfigurationService configurationService) {
  return new RouteDefinitionRouteLocator(routeDefinitionLocator, predicates,
      gatewayFilters, properties, configurationService);
}

@Bean
@Primary
@ConditionalOnMissingBean(name = "cachedCompositeRouteLocator")
public RouteLocator cachedCompositeRouteLocator(List routeLocators) {
  return new CachingRouteLocator(
      new CompositeRouteLocator(Flux.fromIterable(routeLocators)));
}

中并没有直接装配 CompositeRouteLocator,而是嵌套在了CachingRouteLocator 中,上述代码块17行表示所有RouteLocator的实现类都会被装配到 中(也包含,16行巧妙的通过一个Conditional避免了对自身的循环依赖),这样注入到 其实只有 ,这里可以由玩家进行拓展。

本文不进行展开说明,在你创建路由后,需发布一个 事件,然后这个Locator就可以监听到该事件,并刷新路由。

我们看一下的构造方法

public RouteDefinitionRouteLocator(RouteDefinitionLocator routeDefinitionLocator,
    List predicates,
    List gatewayFilterFactories,
    GatewayProperties gatewayProperties,
    ConfigurationService configurationService) {
  this.routeDefinitionLocator = routeDefinitionLocator;
  this.configurationService = configurationService;
  // 断言工厂初始化,同下面过滤器器工厂类似,也有截去RoutePredicateFactory的操作
  initFactories(predicates);
  gatewayFilterFactories.forEach(
      // factory.name() 用以获取过滤器工厂的名字,具体代码实现就是 末尾截去"GatewayFilterFactory"
      // 这也是为什么我们自定义过滤器工厂时需要以GatewayFilterFactory结尾命名
      factory -> this.gatewayFilterFactories.put(factory.name(), factory));
  this.gatewayProperties = gatewayProperties;
}

结合上面代码块的内容,例如我们在使用Gateway 时定义一个路由:

spring:
  cloud:
    gateway:
      routes:
      - id: path_route
        uri: https://example.org
        predicates:
        - Path=/red/{segment},/blue/{segment}
        filters:
        - AddRequestHeader=X-Request-red, blue

Path 对应 PathRoutePredicateFactory,AddRequestHeader 对应 AddRequestHeaderGatewayFilterFactory。

RouteDefinitionLocator 实现类

装配 RouteDefinitionRouteLocator 时注入了一个 类型为 的Bean,回到 看 RouteDefinitionLocator 是如何装配的,源码208-214行

@Bean
@Primary 
// 注意 @Primary 决定了上面被注入的是这个Bean,
// 在GatewayAutoConfiguration 中装配了多个RouteDefinitionLocator的子类,包括
// CompositeRouteDefinitionLocator、InMemoryRouteDefinitionRepository,
// 还在其他配置类中装配的
// PropertiesRouteDefinitionLocator 、DiscoveryClientRouteDefinitionLocator 
public RouteDefinitionLocator routeDefinitionLocator(
  List routeDefinitionLocators) {
  return new CompositeRouteDefinitionLocator(Flux.fromIterable(routeDefinitionLocators));
}

这里采用了一个 对所有 的实现类进行了组合封装,这些实现了,都实现了具体的 方法。

  • 基于配置文件实现的路由加载器,配置文件修改后不支持动态加载

  • 基于服务发现实现的路由加载器,可动态发现新服务

是对上面DiscoveryClientRouteDefinitionLocator配套的过滤器、断言的自动装配,默认关闭此加载器,需要通过配置文件开启(如无需要不开启,影响性能)。

# 开启此locator
spring.cloud.gateway.discovery.locator.enabled=true
# 开启默认为 reactive 模式,需显示关闭可调整为阻塞模式
spring.cloud.discovery.reactive.enabled=false
  • 通过翻阅源码,发现此加载器并未实装,可能是考虑在底层RouteLocator已经具备了缓存。

  • 基于内存存储的路由加载器,可以通过SpringCloud Gateway提供的 management endpoint 进行路由管理,但由于基于内存实现,并未持久化。

通过 查看其装配代码,源码202-206行:

@Bean
// 只有在没有装配RouteDefinitionRepository的其他实现Bean时,才生效,
// 我们可以通过该扩展点实现动态路由及持久化
@ConditionalOnMissingBean(RouteDefinitionRepository.class)
public InMemoryRouteDefinitionRepository inMemoryRouteDefinitionRepository() {
	return new InMemoryRouteDefinitionRepository();
}

Management Endpoints:

  • 是对其他 RouteDefinitionLocator 的组合,通过 可以看到其装配代码,源码208-214行

@Bean
@Primary
public RouteDefinitionLocator routeDefinitionLocator(
    List routeDefinitionLocators) {
  return new CompositeRouteDefinitionLocator(
      Flux.fromIterable(routeDefinitionLocators));
}

可以看到前面 中注入的 就是CompositeRouteDefinitionLocator,而装配它是注入了所有RouteDefinitionLocator的实现,其中包括RouteDefinitionRepository的实现(默认情况下为InMemoryRouteDefinitionRepository)。

到这里就比较明确了,如果需要实现可持久化存储的动态路由,我们只需基于数据库(或其他持久化存储),参考InMemoryRouteDefinitionRepository实现一个RouteDefinitionRepository即可。

实现基于MongoDB的动态路由

上文归纳总结如下:

全部代码见:

主要代码片段

@Slf4j
@Component
public class MongoRouteDefinitionRepository
    implements RouteDefinitionRepository, ApplicationEventPublisherAware {

  private static final String CACHE_KEY = "routes";

  private ApplicationEventPublisher eventPublisher;

  private Map cache = new ConcurrentHashMap<>();

  private final RouteRepositoryOperations repositoryOperation;

  public MongoRouteDefinitionRepository(RouteRepositoryOperations repositoryOperation) {
    this.repositoryOperation = repositoryOperation;
  }

  @Override
  public Flux getRouteDefinitions() {
    return Flux.fromIterable(cache.values());
  }

  @Override
  public Mono save(Mono route) {
    return route.flatMap(
        r -> repositoryOperation.save(MongoRouteDefinition.from(r))
            .log()
            .doOnNext(this::addCache)
            .then(Mono.empty())
    );
  }

  @Override
  public Mono delete(Mono routeId) {
    return repositoryOperation.findById(routeId)
        .log()
        .map(RouteDefinition::getId)
        .doOnNext(this::removeCache)
        .flatMap(repositoryOperation::deleteById);
  }

  @Override
  public void setApplicationEventPublisher(ApplicationEventPublisher eventPublisher) {
    this.eventPublisher = eventPublisher;
  }

  /**
   * 将指定路由加入到缓存中。
   * 

* 为了能实时加载路由,可以通过MongoDB的ChangeStream,监听到数据变化后调用此方法 */ public void addCache(RouteDefinition route) { this.cache.putIfAbsent(route.getId(), route); this.publishEvent(); } /** * 将指定路由从缓存中删除。 *

* 为了能实时加载路由,可以通过MongoDB的ChangeStream,监听到数据变化后调用此方法 */ public void removeCache(String routeId) { if (this.cache.remove(routeId) != null) { this.publishEvent(); } } void publishEvent() { eventPublisher.publishEvent(new RefreshRoutesEvent(this)); } RouteRepositoryOperations getRepositoryOperation() { return repositoryOperation; } Map getCache() { return cache; } void setCache( Map cache) { this.cache = cache; } }

@Slf4j
@Component
@ConditionalOnProperty(value = "route.schedule.enabled", havingValue = "true", matchIfMissing = true)
public class RouteRefresher {

  private final MongoRouteDefinitionRepository repository;

  public RouteRefresher(
      MongoRouteDefinitionRepository repository) {
    this.repository = repository;
  }

  /**
   * 固定间隔重新加载一次缓存
   */
  @Scheduled(initialDelay = 10000, fixedDelay = 60 * 60 * 1001)
  private void refresh() {
    RouteRepositoryOperations operation = repository.getRepositoryOperation();

    int page = 0;
    int pageSize = 1000;
    int total = Math.toIntExact(operation.count().blockOptional().orElse(0L));
    Map oldCache = repository.getCache();
    Map newCache = new ConcurrentHashMap<>(total);
    int oldTotal = oldCache.size();
    if (oldTotal < 1) {
      // 首次同步刷新
      repository.setCache(newCache);
    }
    while (page * pageSize < total) {
      operation.findAll(PageRequest.of(page++, pageSize))
          .doOnNext(route -> newCache.putIfAbsent(route.getId(), route))
          .blockLast();
      log.info("动态路由表当前总大小为:{}, 新路由表当前大小为:{}", oldTotal, newCache.size());
    }
    repository.setCache(newCache);
    log.info("新路由表加载完成,当前大小为:{}", newCache.size());
    repository.publishEvent();
  }
}

@Component
@ConditionalOnProperty(value = "changeStream.enabled", havingValue = "true", matchIfMissing = true)
public class RouteChangeStreamHandler implements CommandLineRunner {

  private final ReactiveMongoTemplate mongoTemplate;
  private final MongoRouteDefinitionRepository routeRepository;

  public RouteChangeStreamHandler(
      MongoRouteDefinitionRepository routeRepository, ReactiveMongoTemplate mongoTemplate) {
    this.routeRepository = routeRepository;
    this.mongoTemplate = mongoTemplate;
  }

  @Override
  public void run(String... args) {
    new Thread(this::startMonitor, "ChangeStream-Monitor-routes").start();
  }

  public void startMonitor() {
    Aggregation aggregation = Aggregation.newAggregation(Aggregation
        .match(Criteria.where("operationType").in("insert", "delete", "update", "replace")));

    ChangeStreamOptions options = ChangeStreamOptions.builder()
        .filter(aggregation)
        .returnFullDocumentOnUpdate()
        .build();

    String collectionName = MongoRouteDefinition.class.getAnnotation(Document.class).value();
    Flux> changeStream = mongoTemplate
        .changeStream(collectionName, options, MongoRouteDefinition.class);

    changeStream
        .log()
        .doOnNext(e -> {
          if (OperationType.INSERT == e.getOperationType()
              || OperationType.UPDATE == e.getOperationType()
              || OperationType.REPLACE == e.getOperationType()) {
            Optional.ofNullable(e.getBody()).ifPresent(routeRepository::addCache);
          } else if (OperationType.DELETE == e.getOperationType()) {
            getId(e).ifPresent(routeRepository::removeCache);
          }
        }).blockLast();
  }

  private Optional getId(ChangeStreamEvent e) {
    return Optional.ofNullable(e.getRaw())
        .flatMap(raw -> Optional.ofNullable(raw.getDocumentKey()))
        .flatMap(docKey -> Optional.ofNullable(docKey.getObjectId("_id")))
        .flatMap(bson -> Optional.of(bson.getValue().toHexString()));
  }
}

@Document("gwRoutes")
public class MongoRouteDefinition extends RouteDefinition {

  public static MongoRouteDefinition from(RouteDefinition route) {
    MongoRouteDefinition newRoute = new MongoRouteDefinition();
    BeanUtils.copyProperties(route, newRoute);
    return newRoute;
  }
}

public interface RouteRepositoryOperations extends
    ReactiveMongoRepository {

  /**
   * 分页查询
   *
   * @param pageable 分页
   * @return 当前页
   */
  @Query(value = "{}", sort = "{_id:1}")
  Flux findAll(Pageable pageable);
}