dolphinscheduler简单任务定义及跨节点传参
转载请注明出处 https://www.cnblogs.com/funnyzpc/p/16395094.html
写在前面
dolphinscheduler
是一款非常不错的调度工具,本文我就简称ds啦,可单机可集群可容器,可调度sql
、存储过程
、http
、大数据
,也可使用shell
、python
、java
、flink
等语言及工具,功能强大类型丰富,适合各类调度型任务,社区及项目也十分活跃,现在github中已有8.2k的star👍
所以,本篇博文开始会逐步讲一些ds相关的东西,也期待各位同行能接触到此并能实际解决一些生产上的问题~😁
一.准备工作
阅读本博文前建议您先阅读下官方的文档[https://dolphinscheduler.apache.org/zh-cn/docs/latest/user_doc/guide/parameter/context.html)(https://dolphinscheduler.apache.org/zh-cn/docs/latest/user_doc/guide/parameter/context.html)(虽然也会碰到一些坑😂) 这里,先准备下sql表资源,以下为`postgresql`的`sql`脚本:
(表结构)
CREATE TABLE dolphinscheduler.tmp ( id int4 NOT NULL, "name" varchar(50) NULL, "label" varchar(50) NULL, update_time timestamp NULL, score int4 NULL, CONSTRAINT tmp_pkey PRIMARY KEY (id) );
(表数据)
INSERT INTO tmp (id,"name","label",update_time,score) VALUES (3,'二狗子','','2022-07-06 21:49:26.872',NULL), (2,'马云云','',NULL,NULL), (1,'李思','','2022-07-05 19:54:31.880',85);
因为个人使用的postgresql
的数据库,如果您是mysql
或者其他数据的用户,请自行更改以上表和数据并添加到库中即可
表及数据入库,请将tmp所属的库配置到 ds后台
->数据源中心
->创建数据源
,以下是我的配置,记住,这里面的所有数据库配置均遵守所属数据库类型的jdbc
的driver
的配置参数,配置完成也会在ds的数据库生成一条jdbc
的连接地址,这点要明白~
二.简单的项目创建及说明
因为`ds`的任务是配置在项目下面,所以第一步得新建一个项目,这样:`ds后台`->`项目管理`->`创建项目`,这是我创建的,请看:
准备完项目之后,鼠标点进去,并进入到 工作流定义
菜单 页面,如下图:
先简单到解释下ds
的一点儿基本结构,首先,ds
一般部署在linux
服务器下,创建任务的用户需要在admin账户
下创建,重要的是创建的每个工作账户需要与操作系统用户一一对应,比如你创建了一个 test 的ds账户,那ds所在的服务器也必须有一个test的账户才可行,这是ds
的规则,我没法解释为什么。
每个用户下(除了admin外)所能创建的调度任务均在各自创建的项目下,每个项目又分为多个任务(工作流定义),一个任务下又可分为多个任务节点,下图为任务定义:
ok,如果已经准备好以上步骤,下面开始定义一个简单的调度任务,继续哈~
三.简单的参数传递
先看表:
我们先做个简单的,比如图中,如果二狗子的本名叫:李思,需要我们取id=1
的name
放到id=3
的label
中,并且更新update_time
-
1.这里第一步 在工作流定义列表,点击
创建工作流
就进入一个具体的任务(工作流)的定义,同时我们使用的是sql任务
,所以就需要从左侧拖动一个sql任务
到画布中(右侧空白处):
因为拖动sql任务
到画布会自动弹出节点定义,上图为当前节点的一个定义,重点是:数据源
、sql类型
、sql语句
,如官方所说,如果将name
传递到下游,则需要在自定义参数重定义这个name
为out
方向类型
为varchar
。 -
2.因为传递到参数需要写入到表,这里我们再定义一个节点,这个节点负责接收上游传递到
name
,执行update
时使用这个name
,以下是我的定义:
看到没,这里不仅仅要注意sql类型
(sql类型
与sql语句
是一一对应
的,类型不能错) ,还有就是前置任务一定要选中(上面定义的)node1
节点。
另外,需要注意的是当前任务是上下游传参,所以在node2
中是直接使用node1
中定义的name
这个参数哈 -
3.定义完成当前任务就需要保存:点右上角保存,填写并保存后点关闭以退出定义:
-
4.因为定义的任务需要上线了才可执行,所以,在工作流定义列表先点该任务的
黄色按钮
(任务上线),然后才是点绿色按钮
(执行任务):
-
5.任务执行成功与否,具体得看任务实例,这是执行
node2节点
的日志:
顺带再看看数据库表是否真实成功:
完美😊
四.复杂的跨节点传参
首先看表:
思考一个问题:可以看到李思
的score
是85
,根据score
应该被评为 B
(>=90的为A)并写入到label
字段,该怎么办呢,如果这个分数是90分又该怎么办呢,如果根本没有score(分值) 这个任务是不是就不需要更新李思的label(评分)呢?
对于上面问题可以有一些偏门的解决方法,比如在sql
中塞一个异常值
,这样看似不错,不过作为调度工具建议还是在condition节点
或者switch节点
处理是最好的,不过就目前我用的2.0.5
版本的ds
对于这两类任务节点是没法接收参数
的,这是一个遗憾;遂~个人觉得较好的方式是在写入节点之前增加一个判断节点
,将错误抛出(没有score的)最好~,对于此,我使用了一个shell的中间节点
。
下面是我定义的三个节点:
-
node1节点定义:
-
node2节点定义:
(脚本内容)
#!/bin/bash echo "=====>input param start<=====" echo "id=${id}" echo "score=${score}" echo "=====>input param end<=====" id=${id} echo '${setValue(id2='$id')}' if [ "${score}" -ge "90" ];then echo '${setValue(label2=level A)}' echo "level A" elif [ "${score}" -ge "80" ];then echo '${setValue(label2=level B)}' echo "level B" elif [ "${score}" -ge "60" ];then echo '${setValue(label2=level C)}' echo "level C" elif [ "${score}" -ge "0" ];then echo '${setValue(label2=F!)}' echo "F!" else echo "NO score ,please check!" exit 1 fi
-
node3节点
定义:
-
看一眼结果🤓:
五.中间的坑
对于复杂节点传参数也碰到一些坑,这些坑大概有这些:
- 1.对于
shell
脚本不熟悉的,判断节点其实还是有一些难度的,这是很重要的一点 - 2.
node2
(判断节点)不能有重复的参数,不管局部的还是node1
(上一级)传递过来的,均不能重复
- 3.因为在
node2
(判断节点)需要将id
以及label
继续往下传(tonode3
),这时候就需要给id
以及label
定义一个映射的out
变量(id2
、label2
) - 4.
node2
中重新设置参数麻烦
,需要在shell
中重新定义变量(id2
、label2
),同时需要在shell任务
内使用拼接的方式赋值(如:echo '${setValue(id2='$id')}'
) - 5.
sql类型
以及不同节点下不同参数时常搞错,不是任何节点都可以接收上级节点
参数,以及局部变量
与传递变量
以及全局变量
优先级区别及可能造成冲突 - 6.ds
列表传参
(2.0是不可以的)很鸡肋
,对于列表传参
又不能在下一级节点做循环赋值
,这点对于ds是有改进的空间的 - 7.等等...
对于
ds
还有很多可扩展的地方(因为实际需要),所以我就做了一些二次开发😂,后面会聊...大家期待哟😚