Bootstrap

Flink Metric

主要引用官方文档

Flink 提供了 Metric 系统,允许收集 Metric 并暴露给外部系统。

注册 Metrics

可以通过任何继承了 RichFunction 的函数访问 Metric 系统。调用 方法,该方法返回一个 MetricGroup 对象,可以创建并注册 Metric。

Metric 类型

Counter

Counter 用来计数。当前值可以使用 / 或 / 进行增减。

// 实现 RichMapFunction 接口
public class MyMapper extends RichMapFunction {
  private transient Counter counter;

  @Override
  public void open(Configuration config) {
    // 定义一个 Counter Metric
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter");
  }

  @Override
  public String map(String value) throws Exception {
    // Counter 增加 1
    this.counter.inc();
    return value;
  }
}

Gauge

Gauge 根据需要提供任何类型的值。需要先创建一个实现 的类,返回值的类形没有限制。

Report 程序在暴露数据给外部系统时,会把对象转换为字符串,这意味着需要一个有意义的 实现。

public class MyMapper extends RichMapFunction {
  private transient int valueToExpose = 0;

  @Override
  public void open(Configuration config) {
    getRuntimeContext()
      .getMetricGroup()
      .gauge("MyGauge", new Gauge() {
        // 实现 org.apache.flink.metrics.Gauge 接口
        @Override
        public Integer getValue() {
          return valueToExpose;
        }
      });
  }

  @Override
  public String map(String value) throws Exception {
    valueToExpose++;
    return value;
  }
}

Histogram

Histogram 统计值的分布。

public class MyMapper extends RichMapFunction {
  private transient Histogram histogram;

  @Override
  public void open(Configuration config) {
    this.histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new MyHistogram());
  }

  @Override
  public Long map(Long value) throws Exception {
    // 加入一个新值
    this.histogram.update(value);
    return value;
  }
}

Flink 没有提供 Histogram 的默认实现,可以添加依赖使用 DropwizardHistogramWrapper 实现


      org.apache.flink
      flink-metrics-dropwizard
      1.13.0

Meter

Meter 用来统计平均吞吐量。

public class MyMapper extends RichMapFunction {
  private transient Meter meter;

  @Override
  public void open(Configuration config) {
    this.meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new MyMeter());
  }

  @Override
  public Long map(Long value) throws Exception {
    // 注册事件
    // markEvent(long n) 可以注册同时发生多个时间
    this.meter.markEvent();
    return value;
  }
}

同样添加 依赖,可以使用 DropwizardMeterWrapper 实现

Scope

每个 Metric 都会分配一个标识符和一组键值对,用来报告 Metric。

标识符基于3个组成部分:注册时的用户定义名称、可选的用户定义 Scope 和系统提供的 Scope。例如,如果 A.B 是系统 Scope,C.D 是用户 Scope,E 是名称,那么标识符将是 A.B.C.D.E。

可以通过在 中设置 键来配置用于标识符的分隔符(默认值:.)。

User Scope

定义 User Scope 的方法: 调用 ,,。这些方法会影响 和 的返回值。

// 创建 Metric 时指定 Scope
counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetrics")
  .counter("myCounter");

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue")
  .counter("myCounter");

System Scope

System Scope 包含 Metric 的上下文信息,例如注册在哪个 Task()或属于哪个 Job()。

应该包含哪些上下文信息可以通过 配置。

  • 默认值:

  • JobManager 的所有 Metric

  • 默认值:

  • JobManager 和 Job 的所有 Metric

  • 默认值:

  • TaskManager 的所有 Metric

  • 默认值:

  • TaskManager 和 Job 的所有 Metric

  • 默认值:

  • Task 的所有 Metric

  • 默认值:

  • Operator 的所有 Metric

可以作为变量使用。变量的数量或顺序没有限制,区分大小写。

例如:Operator Metric 的默认 Scope 格式为 ,生成的标识符类似 的形式;如果希望包含 Task 名称,并且忽略 TaskManager 信息,可以设置 ,生成的标识符会变成 。

建议添加带有 ID 的变量(如:)保证唯一性,避免出现命名冲突的问题。所有可以使用的变量:

  • JobManager:

  • TaskManager: ,

  • Job: ,

  • Task: , , , ,

  • Operator: , ,

Reporter

详细内容参考官方文档

Flink 允许向外部系统报告 Metric。

通过在 中配置一个或多个 Reporter,可以将 Metric 暴露给外部系统。这些 Reporter 在启动时实例化。

  • :Reporter 名称

  • :Reporter 实现类

  • :Reporter 工厂类

  • :Reporter 调用间隔

  • :Scope 标识符的分隔符(默认使用 )

  • :可选项,以 “;” 分隔的变量列表,可以忽略这些变量

  • :可选项,以 “,” 分隔的 Reporter 名称列表,表示应用哪些 Reporter,默认会包含所有配置的 Reporter。

Reporter 必须至少配置 或 属性(使用哪个取决于 Reporter 的实现)。

配置 Reporter 示例

metrics.reporters: my_jmx_reporter,my_other_reporter

metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
metrics.reporter.my_jmx_reporter.port: 9020-9040
metrics.reporter.my_jmx_reporter.scope.variables.excludes:job_id;task_attempt_num

metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000

自定义 Reporter:

  • 实现 org.apache.flink.metrics.reporter.MetricReporter 接口

  • 如果要定时发送报告,实现 Scheduled 接口

下面列出了一些支持的 Reporter

JMX

org.apache.flink.metrics.jmx.JMXReporter

参数:

  • port - JMX 监听端口,建议使用范围:9250-9260。实际端口将显示在相关 Job 或 Task Manager 日志中。

metrics.reporter.jmx.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory 
metrics.reporter.jmx.port: 8789

通过 JMX 公开的 Metric 由一个 domain 和一组 key 属性组成标识。domain 总是以 org.apache.flink 开始,接一个通用 metric 标识(与一般的 metric 标识不同,不受 scope 格式的影响,不包含任何变量),例如:org.apache.flink.job.task.numBytesOut。

key 属性列表包含与给定 Metric 关联的所有变量的值(不受 scope 格式影响)。例如:。

Prometheus

org.apache.flink.metrics.prometheus.PrometheusReporter

参数:

  • port - Prometheus exporter 侦听的端口,默认为 9249,建议使用范围:9250-9260。

  • filterLabelValueCharacters - 可选项,过滤 label 值中的字符。如果启用,不匹配 [a-zA-Z0-9:_] 的字符会被移除。默认开启,在关闭前,确认 label 值是否符合 Premetheus 要求(Flink metric 变量都会作为 Prometheus label)。

metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter

系统 Metrics

默认情况下,Flink 收集的指标

CPU

Memory

Threads

GC

ClassLoader

Default Shuffle Service

代替 Network/IO 部分 Metrics

Cluster

Availability

如果启用了 (1.13 MVP 特性),这些 Metric(除 numRestarts)不能正常工作。

Checkpointing

如果启用了 (1.13 MVP 特性),Job Scope 的 Metric 不能正常工作。

IO

Connectors

Kafka Connector

HBase Connector

延迟跟踪

Flink 允许跟踪在系统中传输的记录的延迟。默认情况下禁用此功能。要启用延迟跟踪,必须在 Flink 配置()或 ExecutionConfig 中将 设置为正数。

Source 会定期(latencyTrackingInterval)发出一个特殊的记录,称为 LatencyMarker。记录包含一个时间戳,该时间戳从记录在源处发出时算起。LatencyMarker 不能超过(overtake)正常记录,因此如果正常记录在 Operator 前排队,将增加标记跟踪的延迟。

延迟监控的粒度,分为以下3档:

  • single:每个算子单独统计延迟;

  • operator(默认值):每个下游算子都统计自己与 Source 算子之间的延迟;

  • subtask:每个下游算子的 sub-task 都统计自己与 Source 算子的 sub-task 之间的延迟。

需要注意:

  • LatencyMarker 记录的时间戳最终是靠 方法获取本地时间,要保证 Flink 集群内所有节点的时区、时间是同步的,可以用 NTP 等工具来配置。

  • 启用延迟 metric 会影响集群的性能(特别是 subtask 粒度)。官方建议仅用于调试目的。

REST API Integration

Metrics 可以通过 REST API 查询。下面列出一些可用的 Endpoint 和 JSON 返回格式。

Base URL:http://hostname:8081/jobmanager/metrics

查询 Metric 未聚合值

查询 Metric 聚合值

查询 Metric 部分值的聚合值

特殊字符需要转义(符合 URL 标准)

查看 Metric 列表

GET /jobmanager/metrics

[
  {
    "id": "metric1"
  },
  {
    "id": "metric2"
  }
]

请求特定 Metric 的值(未聚合)

GET taskmanagers//metrics?get=metric1,metric2

[
  {
    "id": "metric1",
    "value": "34"
  },
  {
    "id": "metric2",
    "value": "2"
  }
]

请求特定 Metric 的聚合值

GET /taskmanagers/metrics?get=metric1,metric2

[
  {
    "id": "metric1",
    "min": 1,
    "max": 34,
    "avg": 15,
    "sum": 45
  },
  {
    "id": "metric2",
    "min": 2,
    "max": 14,
    "avg": 7,
    "sum": 16
  }
]

请求特定 Metric 的特定值的聚合值

GET /taskmanagers/metrics?get=metric1,metric2&agg=min,max

[
  {
    "id": "metric1",
    "min": 1,
    "max": 34,
  },
  {
    "id": "metric2",
    "min": 2,
    "max": 14,
  }
]

Dashboard Integration

为 Task 或 Operator 收集的 Metric 也可以在仪表板中可视化。在作业的主页面上,选择 Metrics 选项卡。在 Graph 中选择一个任务后,可以使用 Add Metric 下拉菜单选择要显示的 Metric。

  • Task metrics 列表样式

  • Operator metrics 列表样式

每个 Metric 可以被可视化为一个单独的图形,x轴表示时间,y轴表示测量值。图表每10秒自动更新一次。