本文介绍通过 Flink CDC 方式实时同步 MySQL 数据至 Hudi(或 Hive)的过程。
环境准备
软件环境
数据库
- MySQL 5.6
如果需要自己准备,可以使用 docker 创建一个1 2 3
| docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.6
|
- Hive 2.2.1-cdh6.3.1
配置
MySQL
单独创建一个可以读取 binlog 的用户:
1 2 3
| CREATE USER 'debezium'@'%' IDENTIFIED BY 'xxx'; GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'xxx'; FLUSH PRIVILEGES;
|
Flink
主要是开启 checkpoint,以及运行在 YARN 的配置,修改的地方可参考:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| jobmanager.rpc.address: localhost jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 4g taskmanager.memory.process.size: 16g taskmanager.numberOfTaskSlots: 4 parallelism.default: 1
execution.checkpointing.interval: 3min
execution.checkpointing.externalized-checkpoint-retention: DELETE_ON_CANCELLATION execution.checkpointing.max-concurrent-checkpoints: 1 execution.checkpointing.min-pause: 0 execution.checkpointing.mode: EXACTLY_ONCE execution.checkpointing.timeout: 20min execution.checkpointing.tolerable-failed-checkpoints: 10
execution.checkpointing.unaligned: true state.backend: rocksdb state.backend.incremental: true state.backend.rocksdb.localdir: /data/flink/rocksdb,/data1/flink/rocksdb
state.checkpoints.dir: hdfs://xxx:8020/flink/1_14_4/flink-checkpoints state.savepoints.dir: hdfs://xxx:8020/flink/1_14_4/flink-savepoints
execution.target: yarn-per-job
jobmanager.execution.failover-strategy: region
|
Flink Catalog 配置
因为我是用 Flink sql-client 来执行的,所以配置如下:
init.sql
1 2 3 4 5 6 7
| CREATE CATALOG HiveCatalog WITH ( 'type' = 'hive', 'default-database' = 'default', 'hive-conf-dir' = '/etc/hive/conf' );
USE CATALOG HiveCatalog;
|
这样,启动 sql-client可以指定该配置文件,进入交互终端:
./bin/sql-client.sh -i conf/init.sql
如果后面 CDC 流程保存为 sql 文件或,再通过-f
指定执行即可:
./bin/sql-client.sh -i conf/init.sql -f sql/test.sql
执行 SQL 模板
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| set yarn.application.name=FlinkCDC-yourappname;
set pipeline.name=FlinkCDC-pipelinename;
set yarn.application.queue=users.cdc;
SET execution.checkpointing.interval=5min;
SET parallelism.default = 4;
DROP TABLE IF EXISTS test_flink;
CREATE TABLE IF NOT EXISTS `test_flink` ( `id` bigint, `db_id` STRING, ... `gmtupdated` STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'xxx.mysql.polardb.rds.aliyuncs.com', 'port' = '3306', 'username' = 'debezium', 'password' = 'xxx', 'server-id' = '5500-5508', 'server-time-zone' = 'Asia/Shanghai', 'database-name' = 'db_name', 'table-name' = 'table' );
DROP TABLE IF EXISTS test_hudi;
CREATE TABLE IF NOT EXISTS `test_hudi` ( `id` bigint PRIMARY KEY, ... `db_id` STRING, ) PARTITIONED BY (`db_id`) WITH ( 'connector' = 'hudi', 'write.operation' = 'upsert', 'write.precombine' = 'true', 'write.precombine.field' = 'gmtupdated', 'read.streaming.enabled' = 'true', 'write.rate.limit' = '8000', 'path' = 'hdfs://xxx:8020/hudi/flink/xxx', 'hive_sync.enable' = 'true', 'hive_sync.mode' = 'hms', 'hive_sync.table'='xxx', 'hive_sync.db'='ods', 'hive_sync.metastore.uris' = 'thrift://xxx:9083', 'table.type' = 'MERGE_ON_READ' );
INSERT INTO xxx_hudi SELECT * FROM xxx_flink;
|
在任务运行后,Flink checkpoint 之后,可以看到 hive 指定的库中多了从 hudi 同步过来的表,COW 为一张,MOR 为两张。
Hive 读取
Hive 直接读取 Hudi 同步过来的表,有可能会有重复数据。此时需要设置一下:
set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
或者
set hive.input.format= org.apache.hadoop.hive.ql.io.HiveInputFormat;
下面老版本 hudi 文档里提到的设置应该已经失效:
set hive.input.format=org.apache.hudi.hadoop.HoodieParquetInputFormat;