Bootstrap

【FlinkSQL】Flink Table & SQL 时间属性

主要引用官方文档

Flink 可以基于几种不同的时间概念来处理数据。

  • 处理时间:指的是执行具体操作时的机器时间

  • 事件时间:指的是数据本身携带的时间。这个时间是在事件产生时的时间。

  • 摄入时间:指的是数据进入 Flink 的时间;在系统内部,会把它当做事件时间来处理。

像窗口这种基于时间的操作,需要有时间信息。因此,Table API 和 SQL 中的表就需要提供逻辑时间属性来表示时间,以及支持时间相关的操作。

可以在 CREATE TABLE DDL 创建表的时候指定、也可以在 DataStream 中指定、也可以在定义 TableSource 时指定。

只要时间属性没有被修改,只是从一个表传递到另一个表,就仍然是一个有效的时间属性,可以像普通的时间戳的列一样被使用和计算。一旦时间属性被用在了计算中,就会变成一个普通的时间戳,普通时间戳不具备时间属性的特性。

定义处理时间

处理时间是基于机器的本地时间来处理数据,是最简单的一种时间概念,但是不能提供确定性。既不需要从数据里获取时间,也不需要生成 watermark。

在创建表的 DDL 中

用 就可以定义处理时间,函数 的返回类型是

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time AS PROCTIME() -- 声明一个额外的列作为处理时间属性
) WITH (
  ...
);


SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

在 DataStream 到 Table 转换时定义

在 schema 定义的时候用 后缀来定义。时间属性一定不能定义在一个已有字段上,所以它只能定义在 schema 定义的最后

DataStream> stream = ...;

// 声明一个额外的字段作为时间属性字段
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").proctime());

// 定义 window
WindowedTable windowedTable = table.window(
        Tumble.over(lit(10).minutes())
            .on($("user_action_time"))
            .as("userActionWindow"));

使用 TableSource 定义

可以在实现了 的 TableSource 中定义。逻辑的时间属性会放在 TableSource 已有物理字段的最后

// 定义一个由处理时间属性的 table source
public class UserActionSource implements StreamTableSource, DefinedProctimeAttribute {

  @Override
  public TypeInformation getReturnType() {
    String[] names = new String[] {"user_name" , "data"};
    TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
    return Types.ROW(names, types);
  }

  @Override
  public DataStream getDataStream(StreamExecutionEnvironment execEnv) {
    // create stream
    DataStream stream = ...;
    return stream;
  }

    // 定义事件事件
  @Override
  public String getProctimeAttribute() {
    // 这个名字的列会被追加到最后,作为第三列
    return "user_action_time";
  }
}

// register table source
tEnv.registerTableSource("user_actions", new UserActionSource());

// 定义 window
WindowedTable windowedTable = tEnv
  .from("user_actions")
  .window(Tumble
      .over(lit(10).minutes())
      .on($("user_action_time"))
      .as("userActionWindow"));

定义事件事件

事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。

事件时间可以让程序在流式和批式作业中使用同样的语法。在流式程序中的事件时间属性,在批式程序中就是一个正常的时间字段。

为了能够处理乱序的事件,并且区分正常到达和晚到的事件,Flink 需要从事件中获取事件时间并且产 watermark。

在创建表的 DDL 中

事件时间属性可以用 语句在 CREATE TABLE DDL 中进行定义。

语句在一个已有字段上定义一个 watermark 生成表达式,同时标记这个已有字段为时间属性字段。

Flink 支持和在 列和 列上定义事件时间。如果源数据中的时间戳数据表示为“年-月-日-时-分-秒”,则通常为不带时区信息的字符串值,例如 2020-04-15 20:13:40.564,建议将事件时间属性定义在 列上:

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time TIMESTAMP(3),
  -- 声明 user_action_time 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermark
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  ...
);


SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

源数据中的时间戳数据表示为一个 Epoch 时间,通常是一个 long 值,例如 1618989564564,建议将事件时间属性定义在 列上:

CREATE TABLE user_actions (
 user_name STRING,
 data STRING,
 ts BIGINT,
 time_ltz AS TO_TIMESTAMP_LTZ(time_ltz, 3),
 -- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategy
 WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
 ...
);


SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);

在 DataStream 到 Table 转换时定义

事件时间属性可以用 后缀在定义 DataStream schema 的时候来定义。

时间戳和 watermark 在这之前一定是在 DataStream 上已经定义好了。在从 DataStream 转换到 Table 时,由于 DataStream 没有时区概念,因此 Flink 总是将 rowtime 属性解析成 类型,并且将所有事件时间的值都视为 UTC 时区的值。

在从 DataStream 到 Table 转换时定义事件时间属性有两种方式。取决于用 .rowtime 后缀修饰的字段名字是否是已有字段,事件时间字段可以是:

  • 在 schema 的结尾追加一个新的字段

  • 替换一个已经存在的字段。

不管在哪种情况下,事件时间字段都表示 DataStream 中定义的事件的时间戳。

// Option 1:

// 基于 stream 中的事件产生时间戳和 watermark
DataStream> stream = inputStream.assignTimestampsAndWatermarks(...);

// 声明一个额外的逻辑字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").rowtime());


// Option 2:

// 从第一个字段获取事件时间,并且产生 watermark
DataStream> stream = inputStream.assignTimestampsAndWatermarks(...);

// 第一个字段已经用作事件时间抽取了,不用再用一个新字段来表示事件时间了
Table table = tEnv.fromDataStream(stream, $("user_action_time").rowtime(), $("user_name"), $("data"));

// Usage:

WindowedTable windowedTable = table.window(Tumble
       .over(lit(10).minutes())
       .on($("user_action_time"))
       .as("userActionWindow"));

使用 TableSource 定义

事件时间属性可以在实现了 的 TableSource 中定义。 方法返回 的列表,包含了描述事件时间属性的字段名字、如何计算事件时间、以及 watermark 生成策略等信息。

同时需要确保 返回的 DataStream 已经定义好了时间属性。 只有在定义了 StreamRecordTimestamp 时间戳分配器的时候,才认为 DataStream 是有时间戳信息的。 只有定义了 PreserveWatermarks watermark 生成策略的 DataStream 的 watermark 才会被保留。反之,则只有时间字段的值是生效的。

// 定义一个有事件时间属性的 table source
public class UserActionSource implements StreamTableSource, DefinedRowtimeAttributes {

  @Override
  public TypeInformation getReturnType() {
    String[] names = new String[] {"user_name", "data", "user_action_time"};
    TypeInformation[] types =
        new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
    return Types.ROW(names, types);
  }

  @Override
  public DataStream getDataStream(StreamExecutionEnvironment execEnv) {
    // 构造 DataStream
    // ...
    // 基于 "user_action_time" 定义 watermark
    DataStream stream = inputStream.assignTimestampsAndWatermarks(...);
    return stream;
  }

  @Override
  public List getRowtimeAttributeDescriptors() {
    // 标记 "user_action_time" 字段是事件时间字段
    // 给 "user_action_time" 构造一个时间属性描述符
    RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
      "user_action_time",
      new ExistingField("user_action_time"),
      new AscendingTimestamps());
    List listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
    return listRowtimeAttrDescr;
  }
}

// register the table source
tEnv.registerTableSource("user_actions", new UserActionSource());

WindowedTable windowedTable = tEnv
  .from("user_actions")
  .window(Tumble.over(lit(10).minutes()).on($("user_action_time")).as("userActionWindow"));