SeaTunnel部署及其Demo

- 从上图可以看出seatunnel的conf(执行命令文件),主要是source、transform、sink组成
- 📥 Source(数据源):作用:从各种数据源读取数据
- 🔄 Transform(数据转换):作用:对数据进行清洗、转换、加工
- 📤 Sink(数据目标):作用:将处理后的数据写入目标系统
1环境、安装下载(国内镜像下载)
- 环境准备:确保你的系统已安装 Java 8 或 11,并正确设置了 JAVA_HOME 环境变量。
1.1下载,解压
https://mirrors.tuna.tsinghua.edu.cn/apache/seatunnel/2.3.12/apache-seatunnel-2.3.12-bin.tar.gz tar -zxvf apache-seatunnel-2.3.12-bin.tar.gz
1.2下载插件(根据需要下载)
安装连接器插件:从2.2.0-beta版本开始,二进制包默认不包含连接器,需要手动安装。 进入解压后的SeaTunnel目录,执行安装脚本: 如果需要指定连接器版本(例如2.3.8),则执行 sh bin/install-plugin.sh 2.3.8。 你通常不需要全部连接器。可以编辑 config/plugin_config 文件,按格式(例如下方)指定所需插件。要让示例应用运行,通常需要 connector-fake 和 connector-console。 连接器插件的作用 连接器插件 = 数据源驱动程序 每个插件让 SeaTunnel 能够连接特定的数据源 MySQL 插件:连接 MySQL 数据库 Oracle 插件:连接 Oracle 数据库 Console 插件:输出到控制台 Fake 插件:生成测试数据
- 修改 config/plugin_config,只保留你需要的:(也可以不改,全部下载)
--connectors-v2-- connector-jdbc-mysql connector-jdbc-oracle connector-console # 这个建议保留,用于调试输出 --end--
- 安装插件,执行命令
# 进入 SeaTunnel 目录 sh bin/install-plugin.sh

1.3JVM参数的配置
- 编辑 bin/seatunnel.sh 文件,在文件开头附近添加或修改
export JVM_ARGS="-Xmx2g -Xms1g -XX:MaxDirectMemorySize=1g" # 其他配置
- 内存配置建议
| 机器内存 | JVM堆内存 | 并行度 | Batch Size |
|---|---|---|---|
| 4G | -Xmx2g -Xms1g | 2 | 500 |
| 8G | -Xmx4g -Xms2g | 4 | 1000 |
| 16G | -Xmx8g -Xms4g | 8 | 2000 |
| 32G | -Xmx16g -Xms8g | 16 | 5000 |
2使用
- mysql官方文档
https://seatunnel.apache.org/docs/2.3.3/connector-v2/source/Mysql
https://seatunnel.apache.org/docs/2.3.3/connector-v2/sink/Mysql
2.1、特殊参数说明
source
- 2.3.x 新版本:必须使用 query,不再支持 table 参数
sink
- generate_sink_sql = true:生成自动插入sql。如果目标库没有表,也会自动建表
2.2、lib下增加jdbc的包(mysql必须)
- 使用mysql的话,需要拷贝jar包到seatunnel安装包/lib 下
1、https://seatunnel.apache.org/docs/2.3.3/connector-v2/source/Mysql
2、https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.33/mysql-connector-java-8.0.33.jar

2.3、增加测试数据
-- mysql建表 CREATE TABLE `t_8_100w` ( `id` bigint NOT NULL COMMENT '主键', `name` varchar(2000) NULL COMMENT '名字', `sex` int null COMMENT '性别:1男;2女', `decimal_f` decimal(32, 6) NULL COMMENT '大数字', `phone_number` varchar(20) DEFAULT '13456780000' COMMENT '电话', `age` varchar(255) NULL COMMENT '字符串年龄转数字', `create_time` timestamp DEFAULT CURRENT_TIMESTAMP COMMENT '新增时间', `description` longtext NULL COMMENT '大文本', `address` varchar(2000) NULL COMMENT '空地址转默认值:未知', PRIMARY KEY (`id`) ); -- 新增存储过程 DELIMITER $$ CREATE PROCEDURE InsertMultipleRows_Batch( IN start_id INT, -- 起始ID IN end_id INT, -- 结束ID IN batch_size INT -- 批次大小 ) BEGIN DECLARE i INT DEFAULT start_id; DECLARE description_text LONGTEXT; DECLARE address_text VARCHAR(255); DECLARE sex_text INT; DECLARE total_to_insert INT; SET total_to_insert = end_id - start_id; -- 开始事务 START TRANSACTION; WHILE i < end_id DO -- 生成精确的1KB文本 SET description_text = REPEAT(CONCAT('DataX_Test_Text_', i, '_ABCDEFGHIJKLMN_'), 41); -- 根据i%2生成地址 IF i % 2 = 0 THEN SET address_text = CONCAT('地址', i); SET sex_text = 1; ELSE SET address_text = NULL; SET sex_text = 2; END IF; -- 插入数据 INSERT INTO t_8_100w (`id`, `name`, `sex`, `decimal_f`, `age`, `description`, `address`) VALUES ( i, CONCAT('名字', i), sex_text, i + 0.000001, ROUND((RAND() * 12) + 18), description_text, address_text ); SET i = i + 1; -- 每batch_size条提交一次 IF i % batch_size = 0 OR i = end_id THEN COMMIT; IF i < end_id THEN START TRANSACTION; END IF; -- 显示进度 IF i % 50000 = 0 OR i = end_id THEN SELECT CONCAT('批次 ', start_id, '-', end_id, ': 已插入 ', i - start_id, ' / ', total_to_insert, ' 条记录') AS progress; END IF; END IF; END WHILE; SELECT CONCAT('批次完成! ID范围: ', start_id, ' 到 ', end_id - 1, ' (共', total_to_insert, '条)') AS batch_complete; END$$ DELIMITER ; -- 分别执行新增数据 -- 测试1万条 CALL InsertMultipleRows_Batch(0, 10000, 500); -- 每10万条创建一次,分批执行 CALL InsertMultipleRows_Batch(10000, 100000, 1000); CALL InsertMultipleRows_Batch(100000, 200000, 1000); CALL InsertMultipleRows_Batch(200000, 300000, 1000); CALL InsertMultipleRows_Batch(300000, 400000, 1000); CALL InsertMultipleRows_Batch(400000, 500000, 1000); CALL InsertMultipleRows_Batch(500000, 600000, 1000); CALL InsertMultipleRows_Batch(600000, 700000, 1000); CALL InsertMultipleRows_Batch(700000, 800000, 1000); CALL InsertMultipleRows_Batch(800000, 900000, 1000); CALL InsertMultipleRows_Batch(900000, 1000000, 1000);
2.4、DEMO1(直接把采集数据打印到控制面板)
# test2mysql.conf - 测试源数据 env { # 并行度(线程数) execution.parallelism = 2 job.mode = "BATCH" } source { Jdbc { url = "jdbc:mysql://ip:port/Cs1" driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "******" query = "select * from t_sea_01" # 连接参数 connection_check_timeout_sec = 300 properties = { useUnicode = true characterEncoding = "utf8" serverTimezone = "Asia/Shanghai" } } } sink { Console {} }
执行命令
./data/seatunnel/apache-seatunnel-2.3.12/bin/seatunnel.sh --config ./data/seatunnel/myconf/test2mysql.conf -m local
查看结果

真背CPU啊(2核云服务器)

2.5、DEMO2(mysql2mysql的不同库)
- 可以测试自动建表
-- 在mysql另一个数据库执行 CREATE TABLE `t_8_100w_import` ( `id` bigint NOT NULL COMMENT '主键', `name` varchar(2000) NULL COMMENT '名字', `sex` int null COMMENT '性别:1男;2女', `decimal_f` decimal(32, 6) NULL COMMENT '大数字', `phone_number` varchar(20) COMMENT '电话', `age` varchar(255) NULL COMMENT '字符串年龄转数字', `create_time` timestamp COMMENT '新增时间', `description` longtext NULL COMMENT '大文本', `address` varchar(2000) NULL COMMENT '空地址转默认值:未知', PRIMARY KEY (`id`) );
- conf文件
# mysql2mysql.conf # 2025-11-28 16:47:47 # env { execution.parallelism = 8 job.mode = "BATCH" } source { Jdbc { url = "jdbc:mysql://ip:13306/Cs1" driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "abc123" query = "select * from t_8_100w" # 并行读取配置 # 数值型主键字段 partition_column = "id" # 分片数,匹配并行度 partition_num = 8 # partition_lower_bound = 1 # 可选:起始ID # partition_upper_bound = 1000000 # 可选:结束ID fetch_size = 500 # 连接参数 # 连接超时时间300ms connection_check_timeout_sec = 300 properties = { useUnicode = true characterEncoding = "utf8" serverTimezone = "Asia/Shanghai" # 使用游标提高大结果集性能 useCursorFetch = "true" # 每次获取行数 defaultFetchSize = "500" } } } transform {} sink { jdbc { url = "jdbc:mysql://ip:13306/Cs2" driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "abc123" # query = "insert into test_table(name,age) values(?,?)" # 生成自动插入sql。如果目标库没有表,也会自动建表 generate_sink_sql = true # generate_sink_sql=true。所以:database必须要 database = Cs2 table = "t_8_100w_import" # 批量写入条数 batch_size = 500 # 批次提交间隔 batch_interval_ms = 500 # 重试次数 max_retries = 3 # 连接参数 # 连接超时时间300ms connection_check_timeout_sec = 300 properties = { useUnicode = true characterEncoding = "utf8" serverTimezone = "Asia/Shanghai" # 关键:启用批量重写 rewriteBatchedStatements = "true" # 启用压缩 useCompression = "true" # 禁用服务端预处理 useServerPrepStmts = "false" } } }
- 执行命令
./data/seatunnel/apache-seatunnel-2.3.12/bin/seatunnel.sh --config ./data/seatunnel/myconf/mysql2mysql.conf -m local # 后台打印日志执行 nohup /data/seatunnel/apache-seatunnel-2.3.12/bin/seatunnel.sh --config ./data/seatunnel/myconf/mysql2mysql.conf -m local > /data/seatunnel/logs/seatunnel.log 2>&1 &
2.5、DEMO3(mysql2mysql——字段映射、清洗、转换)
- 等有空了再做
参考
https://www.jb51.net/program/2850931un.htm
https://www.cnblogs.com/robots2/p/17939863