一文解开主流开源变更数据捕获技术之Flink CDC的入门使用

@

概述

定义

flink-cdc-connectors 官网 https://github.com/ververica/flink-cdc-connectors 源码release最新版本2.4.0

flink-cdc-connectors 文档地址 https://ververica.github.io/flink-cdc-connectors/master/

flink-cdc-connectors 源码地址 https://github.com/ververica/flink-cdc-connectors

CDC Connectors for Apache Flink 是Apache Flink的一组源连接器,使用更改数据捕获(CDC)从不同的数据库摄取更改,其集成了Debezium作为捕获数据变化的引擎,因此它可以充分利用Debezium的能力。

Flink CDC是由Flink社区开发的flink-cdc-connectors 的source组件,基于数据库日志的 Change Data Caputre 技术,实现了从 MySQL、PostgreSQL 等数据库全量和增量的一体化读取能力,并借助 Flink 优秀的管道能力和丰富的上下游生态,支持捕获多种数据库的变更,并将这些变更实时同步到下游存储。

什么是CDC?

这里也简单说明下,CDC为三个英文Change Data Capture(变更数据捕获)的缩写,核心思想是监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其它服务进行订阅及消费。

CDC的分类

CDC主要分为基于查询的CDC和基于binlog的CDC,两者之间区别主要如下:

一文解开主流开源变更数据捕获技术之Flink CDC的入门使用

特性

  • 支持读取数据库快照,即使发生故障,也只进行一次处理,继续读取事务日志。
  • 数据流API的CDC连接器,用户可以在单个作业中消费多个数据库和表上的更改,而无需部署Debezium和Kafka。
  • 用于表/SQL API的CDC连接器,用户可以使用SQL DDL创建CDC源来监视单个表上的更改。

应用场景

  • 数据分发,将一个数据源分发给多个下游,常用于业务解耦、微服务。
  • 数据集成,将分散异构的数据源集成到数据仓库中,消除数据孤岛,便于后续的分析。
  • 数据迁移,常用于数据库备份、容灾等。

支持数据源

CDC Connectors for Apache Flink支持从多种数据库到Flink摄取快照数据和实时更改,然后转换和下沉到各种下游系统

一文解开主流开源变更数据捕获技术之Flink CDC的入门使用

支撑数据源包括如下:

一文解开主流开源变更数据捕获技术之Flink CDC的入门使用

实战

这里以MySQL作为数据源为例,通过flink-connector-mysql-cdc实现数据变更获取,先准备MySQL环境,这里复用前面<<实时采集MySQL数据之轻量工具Maxwell实操>>的文章环境,数据库有两个my_maxwell_01,my_maxwell_02,每个数据库都有相同account和product表。pom文件引入依赖

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">     <modelVersion>4.0.0</modelVersion>      <groupId>cn.itxs.flink</groupId>     <artifactId>flink-cdc-demo</artifactId>     <version>1.0-SNAPSHOT</version>      <properties>         <maven.compiler.source>8</maven.compiler.source>         <maven.compiler.target>8</maven.compiler.target>         <flink.version>1.17.1</flink.version>         <flink.cdc.version>2.4.0</flink.cdc.version>         <mysql.client.version>8.0.29</mysql.client.version>         <fastjson.version>1.2.83</fastjson.version>     </properties>      <dependencies>         <dependency>             <groupId>org.apache.flink</groupId>             <artifactId>flink-streaming-java</artifactId>             <version>${flink.version}</version>             <scope>provided</scope>         </dependency>         <dependency>             <groupId>org.apache.flink</groupId>             <artifactId>flink-clients</artifactId>             <version>${flink.version}</version>             <scope>provided</scope>         </dependency>         <dependency>             <groupId>org.apache.flink</groupId>             <artifactId>flink-connector-base</artifactId>             <version>${flink.version}</version>             <scope>provided</scope>         </dependency>         <dependency>             <groupId>org.apache.flink</groupId>             <artifactId>flink-table-api-java-bridge</artifactId>             <version>${flink.version}</version>             <scope>provided</scope>         </dependency>         <dependency>             <groupId>org.apache.flink</groupId>             <artifactId>flink-table-planner-loader</artifactId>             <version>${flink.version}</version>             <scope>provided</scope>         </dependency>         <dependency>             <groupId>org.apache.flink</groupId>             <artifactId>flink-table-runtime</artifactId>             <version>${flink.version}</version>             <scope>provided</scope>         </dependency>         <dependency>             <groupId>mysql</groupId>             <artifactId>mysql-connector-java</artifactId>             <version>${mysql.client.version}</version>         </dependency>         <dependency>             <groupId>com.ververica</groupId>             <artifactId>flink-connector-mysql-cdc</artifactId>             <version>${flink.cdc.version}</version>         </dependency>         <dependency>             <groupId>com.alibaba</groupId>             <artifactId>fastjson</artifactId>             <version>${fastjson.version}</version>         </dependency>     </dependencies>      <build>         <plugins>             <plugin>                 <groupId>org.apache.maven.plugins</groupId>                 <artifactId>maven-shade-plugin</artifactId>                 <version>3.2.4</version>                 <executions>                     <execution>                         <phase>package</phase>                         <goals>                             <goal>shade</goal>                         </goals>                         <configuration>                             <artifactSet>                                 <excludes>                                     <exclude>com.google.code.findbugs:jsr305</exclude>                                     <exclude>org.slf4j:*</exclude>                                     <exclude>log4j:*</exclude>                                 </excludes>                             </artifactSet>                             <filters>                                 <filter>                                     <!-- Do not copy the signatures in the META-INF folder.                                     Otherwise, this might cause SecurityExceptions when using the JAR. -->                                     <artifact>*:*</artifact>                                     <excludes>                                         <exclude>META-INF/*.SF</exclude>                                         <exclude>META-INF/*.DSA</exclude>                                         <exclude>META-INF/*.RSA</exclude>                                     </excludes>                                 </filter>                             </filters>                             <transformers combine.children="append">                                 <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">                                 </transformer>                             </transformers>                         </configuration>                     </execution>                 </executions>             </plugin>         </plugins>     </build> </project> 

创建DataStreamDemo.java,

package cn.itxs.cdc;  import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  public class DataStreamDemo {     public static void main(String[] args) throws Exception {         MySqlSource<String> mySqlSource = MySqlSource.<String>builder()                 .hostname("mysqlserver")                 .port(3306)                 .databaseList("my_maxwell_01,my_maxwell_02")                 .tableList("my_maxwell_01.*,my_maxwell_02.product")                 .username("root")                 .password("123456")                 .startupOptions(StartupOptions.initial())                 .deserializer(new JsonDebeziumDeserializationSchema()) // 将SourceRecord转换为JSON字符串                 .build();          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();          // 开启checkpoint         env.enableCheckpointing(3000);          env                 .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")                 // 设置平行度为4                 .setParallelism(4)                 .print().setParallelism(1); // 对sink打印使用并行性1来保持消息顺序          env.execute("Print MySQL Snapshot + Binlog");     } } 

mySqlSource的startupOptions(MySQL CDC 消费者可选的启动模式,SQL对应为scan.startup.mode选项)有几种可选启动模式,包括"initial","earliest-offset","latest-offset","specific-offset" 和 "timestamp",默认(也即是不设置)为StartupOptions.initial。

由于上面flink的依赖配置provided,因此在IDEA中启动的话需要勾选下面标红的选项

一文解开主流开源变更数据捕获技术之Flink CDC的入门使用

启动程序,查看日志可以看到从mysql读取目前全量的数据,my_maxwell_02也只读取product表数据

一文解开主流开源变更数据捕获技术之Flink CDC的入门使用

修改两个库的表后可以看到相应修改信息,其中也确认my_maxwell_02的account没有读取变更数据。

一文解开主流开源变更数据捕获技术之Flink CDC的入门使用

{"before":{"id":7,"name":"李丹","age":44},"after":{"id":7,"name":"李丹","age":48},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1687856595000,"snapshot":"false","db":"my_maxwell_01","sequence":null,"table":"account","server_id":1,"gtid":null,"file":"binlog.000025","pos":2798,"row":0,"thread":330184,"query":null},"op":"u","ts_ms":1687856598620,"transaction":null} {"before":{"id":1,"name":"iphone13","type":1},"after":{"id":1,"name":"iphone14","type":1},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1687856605000,"snapshot":"false","db":"my_maxwell_01","sequence":null,"table":"product","server_id":1,"gtid":null,"file":"binlog.000025","pos":3140,"row":0,"thread":330184,"query":null},"op":"u","ts_ms":1687856608748,"transaction":null} {"before":{"id":1,"name":"iphone13","type":1},"after":{"id":1,"name":"iphone14","type":1},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1687856628000,"snapshot":"false","db":"my_maxwell_02","sequence":null,"table":"product","server_id":1,"gtid":null,"file":"binlog.000025","pos":3486,"row":0,"thread":330184,"query":null},"op":"u","ts_ms":1687856631643,"transaction":null} 

如果需要断点续传可以使用状态后端存储来实现,在集群运行可增加checkpoint的相关配置实现持久化

        env.enableCheckpointing(TimeUnit.SECONDS.toMillis(5), CheckpointingMode.EXACTLY_ONCE);         CheckpointConfig checkpointConfig = env.getCheckpointConfig();         checkpointConfig.setCheckpointStorage("hdfs://hadoop2:9000/checkpoints/flink/cdc");         checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(2));         checkpointConfig.setTolerableCheckpointFailureNumber(5);         checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));         checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 

增加打包后放到集群上,执行

./bin/flink run -m hadoop1:8081 -c cn.itxs.cdc.DataStreamDemo ./lib/flink-cdc-demo-1.0-SNAPSHOT.jar  

一文解开主流开源变更数据捕获技术之Flink CDC的入门使用

可以看到的日志也成功输出表的全量的日志和刚才修改增量数据

一文解开主流开源变更数据捕获技术之Flink CDC的入门使用

我这里由于中间停止过重新运行,job编号已变化。因此用新的job,设置保存点

./bin/flink savepoint e4ef292c211975c3128edfe86fbff231 hdfs://hadoop2:9000/savepoints/flink 

一文解开主流开源变更数据捕获技术之Flink CDC的入门使用

在保存点后执行完后修改MySQL的数据;在上面hdfs上目录找到对应创建保存点文件夹,并停止上面运行中的job,从保存点重启启动job

./bin/flink run -m hadoop1:8081 -s hdfs://hadoop2:9000/savepoints/flink/savepoint-7e6b99-99d88b51d00c -c cn.itxs.cdc.DataStreamDemo ./lib/flink-cdc-demo-1.0-SNAPSHOT.jar  

启动后也修改MySQL的数据,可以看到启动后只读取了后面修改两条数据已中断继续读取功能

一文解开主流开源变更数据捕获技术之Flink CDC的入门使用

FlinkSQL方式代码示例

创建SqlDemo.java文件

package cn.itxs.cdc;  import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row;  public class SqlDemo {     public static void main(String[] args) throws Exception {         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         env.setParallelism(1);         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);         tableEnv.executeSql("CREATE TABLE account (n" +                 " id INT NOT NULL,n" +                 " name STRING,n" +                 " age INT,n" +                 " PRIMARY KEY(id) NOT ENFORCEDn" +                 ") WITH (n" +                 " 'connector' = 'mysql-cdc',n" +                 " 'hostname' = 'mysqlserver',n" +                 " 'port' = '3306',n" +                 " 'username' = 'root',n" +                 " 'password' = '123456',n" +                 " 'database-name' = 'my_maxwell_01',n" +                 " 'table-name' = 'account'n" +                 ");");          Table table = tableEnv.sqlQuery("select * from account");         DataStream<Row> rowDataStream = tableEnv.toChangelogStream(table);         rowDataStream.print("account_binlog====");         env.execute();     } } 

启动程序,查看日志可以看到从mysql读取my_maxwell_01库account表的全量的数据,修改表数据也确认读取变更数据。

一文解开主流开源变更数据捕获技术之Flink CDC的入门使用

  • 本人博客网站IT小神 www.itxiaoshen.com
发表评论

评论已关闭。

相关文章