Bootstrap

Flink CDC 系列 - 同步 MySQL 分库分表,构建 Iceberg 实时数据湖

作者:罗宇侠

本篇教程将展示如何使用 Flink CDC 构建实时数据湖,并处理分库分表合并同步的场景。Flink-CDC 项目地址:

https://github.com/ververica/flink-cdc-connectors

Flink 中文学习网站

https://flink-learning.org.cn

在 OLTP 系统中,为了解决单表数据量大的问题,通常采用分库分表的方式将单个大表进行拆分以提高系统的吞吐量。

但是为了方便数据分析,通常需要将分库分表拆分出的表在同步到数据仓库、数据湖时,再合并成一个大表。

这篇教程将展示如何使用 Flink CDC 构建实时数据湖来应对这种场景,本教程的演示基于 Docker,只涉及 SQL,无需一行 Java/Scala 代码,也无需安装 IDE,你可以很方便地在自己的电脑上完成本教程的全部内容。

接下来将以数据从 MySQL 同步到 [1]为例展示整个流程,架构图如下所示:

一、准备阶段

准备一台已经安装了 Docker 的 Linux 或者 MacOS 电脑。

1.1 准备教程所需要的组件

接下来的教程将以 的方式准备所需要的组件。

使用下面的内容创建一个 文件:

version: '2.1'
services:
  sql-client:
    user: flink:flink
    image: yuxialuo/flink-sql-client:1.13.2.v1 
    depends_on:
      - jobmanager
      - mysql
    environment:
      FLINK_JOBMANAGER_HOST: jobmanager
      MYSQL_HOST: mysql
    volumes:
      - shared-tmpfs:/tmp/iceberg
  jobmanager:
    user: flink:flink
    image: flink:1.13.2-scala_2.11
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
    volumes:
      - shared-tmpfs:/tmp/iceberg
  taskmanager:
    user: flink:flink
    image: flink:1.13.2-scala_2.11
    depends_on:
      - jobmanager
    command: taskmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2
    volumes:
      - shared-tmpfs:/tmp/iceberg
  mysql:
    image: debezium/example-mysql:1.1
    ports:
      - "3306:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=123456
      - MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw

volumes:
  shared-tmpfs:
    driver: local
    driver_opts:
      type: "tmpfs"
      device: "tmpfs"

该 Docker Compose 中包含的容器有:

  • SQL-Client:Flink SQL Client, 用来提交 SQL 查询和查看 SQL 的执行结果;

  • Flink Cluster:包含 Flink JobManager 和 Flink TaskManager,用来执行 Flink SQL;

  • MySQL:作为分库分表的数据源,存储本教程的 表。

在 所在目录下执行下面的命令来启动本教程需要的组件:

docker-compose up -d

该命令将以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。你可以通过 来观察上述的容器是否正常启动了,也可以通过访问 来查看 Flink 是否运行正常。

注意:

1.2 准备数据

    docker-compose exec mysql mysql -uroot -p123456

    CREATE DATABASE db_1;
    USE db_1;
    CREATE TABLE user_1 (
      id INTEGER NOT NULL PRIMARY KEY,
      name VARCHAR(255) NOT NULL DEFAULT 'flink',
      address VARCHAR(1024),
      phone_number VARCHAR(512),
      email VARCHAR(255)
    );
    INSERT INTO user_1 VALUES (110,"user_110","Shanghai","123567891234","user_110@foo.com");
   
    CREATE TABLE user_2 (
      id INTEGER NOT NULL PRIMARY KEY,
      name VARCHAR(255) NOT NULL DEFAULT 'flink',
      address VARCHAR(1024),
      phone_number VARCHAR(512),
      email VARCHAR(255)
    );
   INSERT INTO user_2 VALUES (120,"user_120","Shanghai","123567891234","user_120@foo.com");

   CREATE DATABASE db_2;
   USE db_2;
   CREATE TABLE user_1 (
     id INTEGER NOT NULL PRIMARY KEY,
     name VARCHAR(255) NOT NULL DEFAULT 'flink',
     address VARCHAR(1024),
     phone_number VARCHAR(512),
     email VARCHAR(255)
   );
   INSERT INTO user_1 VALUES (110,"user_110","Shanghai","123567891234", NULL);
   
   CREATE TABLE user_2 (
     id INTEGER NOT NULL PRIMARY KEY,
     name VARCHAR(255) NOT NULL DEFAULT 'flink',
     address VARCHAR(1024),
     phone_number VARCHAR(512),
     email VARCHAR(255)
   );
   INSERT INTO user_2 VALUES (220,"user_220","Shanghai","123567891234","user_220@foo.com");

二、在 Flink SQL CLI 中使用 Flink DDL 创建表

首先,使用如下的命令进入 Flink SQL CLI 容器中:

docker-compose exec sql-client ./sql-client

我们可以看到如下界面:

然后,进行如下步骤:

   -- Flink SQL
   -- 每隔 3 秒做一次 checkpoint                 
   Flink SQL> SET execution.checkpointing.interval = 3s;

   -- Flink SQL
   Flink SQL> CREATE TABLE user_source (
       database_name STRING METADATA VIRTUAL,
       table_name STRING METADATA VIRTUAL,
       `id` DECIMAL(20, 0) NOT NULL,
       name STRING,
       address STRING,
       phone_number STRING,
       email STRING,
       PRIMARY KEY (`id`) NOT ENFORCED
     ) WITH (
       'connector' = 'mysql-cdc',
       'hostname' = 'mysql',
       'port' = '3306',
       'username' = 'root',
       'password' = '123456',
       'database-name' = 'db_[0-9]+',
       'table-name' = 'user_[0-9]+'
     );

   -- Flink SQL
   Flink SQL> CREATE TABLE all_users_sink (
       database_name STRING,
       table_name    STRING,
       `id`          DECIMAL(20, 0) NOT NULL,
       name          STRING,
       address       STRING,
       phone_number  STRING,
       email         STRING,
       PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED
     ) WITH (
       'connector'='iceberg',
       'catalog-name'='iceberg_catalog',
       'catalog-type'='hadoop',  
       'warehouse'='file:///tmp/iceberg/warehouse',
       'format-version'='2'
     );

三、流式写入 Iceberg

   -- Flink SQL
   Flink SQL> INSERT INTO all_users_sink select * from user_source;

上述命令将会启动一个流式作业,源源不断将 MySQL 数据库中的全量和增量数据同步到 Iceberg 中。在 [4] 上可以看到这个运行的作业:

然后我们就可以使用如下的命令看到 Iceberg 中的写入的文件:

   docker-compose exec sql-client tree /tmp/iceberg/warehouse/default_database/

如下所示:

在你的运行环境中,实际的文件可能与上面的截图不相同,但是整体的目录结构应该相似。

   -- Flink SQL
   Flink SQL> SELECT * FROM all_users_sink;

在 Flink SQL CLI 中我们可以看到如下查询结果:

修改 MySQL 中表的数据,Iceberg 中的表 中的数据也将实时更新:

(3.1) 在 表中插入新的一行

   --- db_1
   INSERT INTO db_1.user_1 VALUES (111,"user_111","Shanghai","123567891234","user_111@foo.com");

(3.2) 更新 表的数据

   --- db_1
   UPDATE db_1.user_2 SET address='Beijing' WHERE id=120;

(3.3) 在 表中删除一行

   --- db_2
   DELETE FROM db_2.user_2 WHERE id=220;

每执行一步,我们就可以在 Flink Client CLI 中使用 查询表 来看到数据的变化。

最后的查询结果如下所示:

从 Iceberg 的最新结果中可以看到新增了的记录,的地址更新成了 ,且的记录被删除了,与我们在 MySQL 做的数据更新完全一致。

四、环境清理

本教程结束后,在 文件所在的目录下执行如下命令停止所有容器:

docker-compose down

五、总结

在本文中,我们展示了如何使用 Flink CDC 同步 MySQL 分库分表的数据,快速构建 Icberg 实时数据湖。用户也可以同步其他数据库(Postgres/Oracle)的数据到 Hudi 等数据湖中。最后希望通过本文,能够帮助读者快速上手 Flink CDC 。

更多 Flink CDC 相关技术问题,可扫码加入社区钉钉交流群~

注释:

[1] https://iceberg.apache.org/

[2] https://github.com/luoyuxia/flink-cdc-tutorial/tree/main/flink-cdc-iceberg-demo/sql-client

[3] https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime/

Flink Forward Asia 2021

2022 年 1 月 8-9 日,FFA 2021 重磅开启,全球 40+ 多行业一线厂商,80+ 干货议题,带来专属于开发者的技术盛宴。

大会官网:https://flink-forward.org.cn

大会线上观看地址 (记得预约哦):https://developer.aliyun.com/special/ffa2021/live

更多 Flink 相关技术问题,可扫码加入社区钉钉交流群第一时间获取最新技术文章和社区动态,请关注公众号~