SeaTunnel部署及其Demo

SeaTunnel部署及其Demo

SeaTunnel部署及其Demo

  • 从上图可以看出seatunnel的conf(执行命令文件),主要是source、transform、sink组成
  • 📥 Source(数据源):作用:从各种数据源读取数据
  • 🔄 Transform(数据转换):作用:对数据进行清洗、转换、加工
  • 📤 Sink(数据目标):作用:将处理后的数据写入目标系统

1环境、安装下载(国内镜像下载)

  • 环境准备:确保你的系统已安装 Java 8 或 11,并正确设置了 JAVA_HOME 环境变量。

1.1下载,解压

https://mirrors.tuna.tsinghua.edu.cn/apache/seatunnel/2.3.12/apache-seatunnel-2.3.12-bin.tar.gz  tar -zxvf apache-seatunnel-2.3.12-bin.tar.gz 

1.2下载插件(根据需要下载)

安装连接器插件:从2.2.0-beta版本开始,二进制包默认不包含连接器,需要手动安装。  进入解压后的SeaTunnel目录,执行安装脚本:  如果需要指定连接器版本(例如2.3.8),则执行 sh bin/install-plugin.sh 2.3.8。  你通常不需要全部连接器。可以编辑 config/plugin_config 文件,按格式(例如下方)指定所需插件。要让示例应用运行,通常需要 connector-fake 和 connector-console。   连接器插件的作用 连接器插件 = 数据源驱动程序  每个插件让 SeaTunnel 能够连接特定的数据源  MySQL 插件:连接 MySQL 数据库  Oracle 插件:连接 Oracle 数据库  Console 插件:输出到控制台  Fake 插件:生成测试数据 
  • 修改 config/plugin_config,只保留你需要的:(也可以不改,全部下载)
--connectors-v2-- connector-jdbc-mysql connector-jdbc-oracle connector-console  # 这个建议保留,用于调试输出 --end-- 
  • 安装插件,执行命令
# 进入 SeaTunnel 目录 sh bin/install-plugin.sh 

SeaTunnel部署及其Demo

1.3JVM参数的配置

  • 编辑 bin/seatunnel.sh 文件,在文件开头附近添加或修改
export JVM_ARGS="-Xmx2g -Xms1g -XX:MaxDirectMemorySize=1g"  # 其他配置 
  • 内存配置建议
机器内存 JVM堆内存 并行度 Batch Size
4G -Xmx2g -Xms1g 2 500
8G -Xmx4g -Xms2g 4 1000
16G -Xmx8g -Xms4g 8 2000
32G -Xmx16g -Xms8g 16 5000

2使用

2.1、特殊参数说明

source

  • 2.3.x 新版本:必须使用 query,不再支持 table 参数

sink

  • generate_sink_sql = true:生成自动插入sql。如果目标库没有表,也会自动建表

2.2、lib下增加jdbc的包(mysql必须)

SeaTunnel部署及其Demo

2.3、增加测试数据

-- mysql建表 CREATE TABLE `t_8_100w`  (   `id` bigint NOT NULL COMMENT '主键',   `name` varchar(2000) NULL COMMENT '名字',   `sex` int null COMMENT '性别:1男;2女',   `decimal_f` decimal(32, 6) NULL COMMENT '大数字',   `phone_number` varchar(20) DEFAULT '13456780000' COMMENT '电话',   `age` varchar(255) NULL COMMENT '字符串年龄转数字',   `create_time` timestamp DEFAULT CURRENT_TIMESTAMP COMMENT '新增时间',   `description` longtext NULL COMMENT '大文本',   `address` varchar(2000) NULL COMMENT '空地址转默认值:未知',   PRIMARY KEY (`id`) );  -- 新增存储过程 DELIMITER $$ CREATE PROCEDURE InsertMultipleRows_Batch(     IN start_id INT,        -- 起始ID     IN end_id INT,          -- 结束ID     IN batch_size INT       -- 批次大小 ) BEGIN     DECLARE i INT DEFAULT start_id;     DECLARE description_text LONGTEXT;     DECLARE address_text VARCHAR(255); 		DECLARE sex_text INT;     DECLARE total_to_insert INT;          SET total_to_insert = end_id - start_id;          -- 开始事务     START TRANSACTION;          WHILE i < end_id DO         -- 生成精确的1KB文本         SET description_text = REPEAT(CONCAT('DataX_Test_Text_', i, '_ABCDEFGHIJKLMN_'), 41);                  -- 根据i%2生成地址         IF i % 2 = 0 THEN             SET address_text = CONCAT('地址', i); 						SET sex_text = 1;         ELSE             SET address_text = NULL; 						SET sex_text = 2;         END IF;                  -- 插入数据         INSERT INTO t_8_100w (`id`, `name`, `sex`, `decimal_f`, `age`, `description`, `address`)          VALUES (             i,              CONCAT('名字', i),  						sex_text,             i + 0.000001,              ROUND((RAND() * 12) + 18),             description_text,             address_text         );                  SET i = i + 1;                  -- 每batch_size条提交一次         IF i % batch_size = 0 OR i = end_id THEN             COMMIT;             IF i < end_id THEN                 START TRANSACTION;             END IF;                          -- 显示进度             IF i % 50000 = 0 OR i = end_id THEN                 SELECT CONCAT('批次 ', start_id, '-', end_id, ': 已插入 ', i - start_id, ' / ', total_to_insert, ' 条记录') AS progress;             END IF;         END IF;     END WHILE;          SELECT CONCAT('批次完成! ID范围: ', start_id, ' 到 ', end_id - 1, ' (共', total_to_insert, '条)') AS batch_complete; END$$ DELIMITER ;  -- 分别执行新增数据 -- 测试1万条 CALL InsertMultipleRows_Batch(0, 10000, 500); -- 每10万条创建一次,分批执行 CALL InsertMultipleRows_Batch(10000, 100000, 1000); CALL InsertMultipleRows_Batch(100000, 200000, 1000); CALL InsertMultipleRows_Batch(200000, 300000, 1000); CALL InsertMultipleRows_Batch(300000, 400000, 1000); CALL InsertMultipleRows_Batch(400000, 500000, 1000); CALL InsertMultipleRows_Batch(500000, 600000, 1000); CALL InsertMultipleRows_Batch(600000, 700000, 1000); CALL InsertMultipleRows_Batch(700000, 800000, 1000); CALL InsertMultipleRows_Batch(800000, 900000, 1000); CALL InsertMultipleRows_Batch(900000, 1000000, 1000); 

2.4、DEMO1(直接把采集数据打印到控制面板)

# test2mysql.conf - 测试源数据 env {   # 并行度(线程数)   execution.parallelism = 2   job.mode = "BATCH" }  source {   Jdbc {     url = "jdbc:mysql://ip:port/Cs1"     driver = "com.mysql.cj.jdbc.Driver"     user = "root"     password = "******"     query = "select * from t_sea_01"          # 连接参数     connection_check_timeout_sec = 300     properties = {       useUnicode = true       characterEncoding = "utf8"       serverTimezone = "Asia/Shanghai"     }   } }  sink {   Console {} } 

执行命令

./data/seatunnel/apache-seatunnel-2.3.12/bin/seatunnel.sh --config ./data/seatunnel/myconf/test2mysql.conf -m local 

查看结果
SeaTunnel部署及其Demo
真背CPU啊(2核云服务器)
SeaTunnel部署及其Demo

2.5、DEMO2(mysql2mysql的不同库)

  • 可以测试自动建表
-- 在mysql另一个数据库执行 CREATE TABLE `t_8_100w_import`  (   `id` bigint NOT NULL COMMENT '主键',   `name` varchar(2000) NULL COMMENT '名字',   `sex` int null COMMENT '性别:1男;2女',   `decimal_f` decimal(32, 6) NULL COMMENT '大数字',   `phone_number` varchar(20) COMMENT '电话',   `age` varchar(255) NULL COMMENT '字符串年龄转数字',   `create_time` timestamp COMMENT '新增时间',   `description` longtext NULL COMMENT '大文本',   `address` varchar(2000) NULL COMMENT '空地址转默认值:未知',   PRIMARY KEY (`id`) ); 
  • conf文件
# mysql2mysql.conf # 2025-11-28 16:47:47 #  env {   execution.parallelism = 8   job.mode = "BATCH" }  source {   Jdbc {     url = "jdbc:mysql://ip:13306/Cs1"     driver = "com.mysql.cj.jdbc.Driver"     user = "root"     password = "abc123"     query = "select * from t_8_100w"          # 并行读取配置     # 数值型主键字段     partition_column = "id"     # 分片数,匹配并行度     partition_num = 8     # partition_lower_bound = 1     # 可选:起始ID     # partition_upper_bound = 1000000 # 可选:结束ID     fetch_size = 500          # 连接参数     # 连接超时时间300ms     connection_check_timeout_sec = 300     properties = {       useUnicode = true       characterEncoding = "utf8"       serverTimezone = "Asia/Shanghai"       # 使用游标提高大结果集性能       useCursorFetch = "true"       # 每次获取行数       defaultFetchSize = "500"     }   } }  transform {}  sink {   jdbc {     url = "jdbc:mysql://ip:13306/Cs2"     driver = "com.mysql.cj.jdbc.Driver"     user = "root"     password = "abc123"     # query = "insert into test_table(name,age) values(?,?)"     # 生成自动插入sql。如果目标库没有表,也会自动建表     generate_sink_sql = true     # generate_sink_sql=true。所以:database必须要     database = Cs2     table = "t_8_100w_import"          # 批量写入条数     batch_size = 500     # 批次提交间隔     batch_interval_ms = 500     # 重试次数     max_retries = 3          # 连接参数     # 连接超时时间300ms     connection_check_timeout_sec = 300     properties = {       useUnicode = true       characterEncoding = "utf8"       serverTimezone = "Asia/Shanghai"              # 关键:启用批量重写       rewriteBatchedStatements = "true"       # 启用压缩       useCompression = "true"       # 禁用服务端预处理       useServerPrepStmts = "false"     }   } } 
  • 执行命令
./data/seatunnel/apache-seatunnel-2.3.12/bin/seatunnel.sh --config ./data/seatunnel/myconf/mysql2mysql.conf -m local  # 后台打印日志执行 nohup /data/seatunnel/apache-seatunnel-2.3.12/bin/seatunnel.sh --config ./data/seatunnel/myconf/mysql2mysql.conf -m local > /data/seatunnel/logs/seatunnel.log 2>&1 & 

2.5、DEMO3(mysql2mysql——字段映射、清洗、转换)

  • 等有空了再做

参考

https://www.jb51.net/program/2850931un.htm
https://www.cnblogs.com/robots2/p/17939863

发表评论

评论已关闭。

相关文章

当前内容话题