[TOC]
1. 简介
Airflow是一个可编程,调度和监控的工作流平台,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行。airflow提供了丰富的命令行工具用于系统管控,而其web管理界面同样也可以方便的管控调度任务,并且对任务运行状态进行实时监控,方便了系统的运维和管理。
Airflow 中常见的名词概念:
DAG
DAG 意为有向无循环图,在 Airflow 中则定义了整个完整的作业。同一个 DAG 中的所有 Task 拥有相同的调度时间。
Task
Task 为 DAG 中具体的作业任务,它必须存在于某一个 DAG 之中。Task 在 DAG 中配置依赖关系,跨 DAG 的依赖是可行的,但是并不推荐。跨 DAG 依赖会导致 DAG 图的直观性降低,并给依赖管理带来麻烦。
DAG Run
当一个 DAG 满足它的调度时间,或者被外部触发时,就会产生一个 DAG Run。可以理解为由 DAG 实例化的实例。
Task Instance
当一个 Task 被调度启动时,就会产生一个 Task Instance。可以理解为由 Task 实例化的实例。
2、Airflow 的服务构成
一个正常运行的 Airflow 系统一般由以下几个服务构成
WebServer
Airflow 提供了一个可视化的 Web 界面。启动 WebServer 后,就可以在 Web 界面上查看定义好的 DAG 并监控及改变运行状况。也可以在 Web 界面中对一些变量进行配置。
Worker
一般来说我们用 Celery Worker 来执行具体的作业。Worker 可以部署在多台机器上,并可以分别设置接收的队列。当接收的队列中有作业任务时,Worker 就会接收这个作业任务,并开始执行。Airflow 会自动在每个部署 Worker 的机器上同时部署一个 Serve Logs 服务,这样我们就可以在 Web 界面上方便的浏览分散在不同机器上的作业日志了。
Scheduler
整个 Airflow 的调度由 Scheduler 负责发起,每隔一段时间 Scheduler 就会检查所有定义完成的 DAG 和定义在其中的作业,如果有符合运行条件的作业,Scheduler 就会发起相应的作业任务以供 Worker 接收。
Flower
Flower 提供了一个可视化界面以监控所有 Celery Worker 的运行状况。这个服务并不是必要的。
3. Airflow 的 Web 界面
1、DAG 列表
DAG 列表
- 左侧 On/Off 按钮控制 DAG 的运行状态,Off 为暂停状态,On 为运行状态。注意:所有 DAG 脚本初次部署完成时均为 Off 状态。
- 若 DAG 名称处于不可点击状态,可能为 DAG 被删除或未载入。若 DAG 未载入,可点击右侧刷新按钮进行刷新。注意:由于可以部署若干 WebServer,所以单次刷新可能无法刷新所有 WebServer 缓存,可以尝试多次刷新。
- Recent Tasks 会显示最近一次 DAG Run(可以理解为 DAG 的执行记录)中 Task Instances(可以理解为作业的执行记录)的运行状态,如果 DAG Run 的状态为 running,此时显示最近完成的一次以及正在运行的 DAG Run 中所有 Task Instances 的状态。
- Last Run 显示最近一次的 execution date。注意:execution date 并不是真实执行时间,具体细节在下文 DAG 配置中详述。将鼠标移至 execution date 右侧 info 标记上,会显示 start date,start date 为真实运行时间。start date 一般为 execution date 所对应的下次执行时间。
2、作业操作框
在 DAG 的树状图和 DAG 图中都可以点击对应的 Task Instance 以弹出 Task Instance 模态框,以进行 Task Instance 的相关操作。注意:选择的 Task Instance 为对应 DAG Run 中的 Task Instance。
作业操作框
- 在作业名字的右边有一个漏斗符号,点击后整个 DAG 的界面将只显示该作业及该作业的依赖作业。当该作业所处的 DAG 较大时,此功能有较大的帮助。
- Task Instance Details 显示该 Task Instance 的详情,可以从中得知该 Task Instance 的当前状态,以及处于当前状态的原因。例如,若该 Task Instance 为 no status 状态,迟迟不进入 queued 及 running 状态,此时就可通过 Task Instance Details 中的 Dependency 及 Reason 得知原因。
- Rendered 显示该 Task Instance 被渲染后的命令。
- Run 指令可以直接执行当前作业。
- Clear 指令为清除当前 Task Instance 状态,清除任意一个 Task Instance 都会使当前 DAG Run 的状态变更为 running。注意:如果被清除的 Task Instance 的状态为 running,则会尝试 kill 该 Task Instance 所执行指令,并进入 shutdown 状态,并在 kill 完成后将此次执行标记为 failed(如果 retry 次数没有用完,将标记为 up_for_retry)。Clear 有额外的5个选项,均为多选,这些选项从左到右依次为:
- Past: 同时清除所有过去的 DAG Run 中此 Task Instance 所对应的 Task Instance。
- Future: 同时清除所有未来的 DAG Run 中此 Task Instance 所对应的 Task Instance。注意:仅清除已生成的 DAG Run 中的 Task Instance。
- Upstream: 同时清除该 DAG Run 中所有此 Task Instance 上游的 Task Instance。
- Downstream: 同时清除该 DAG Run 中所有此 Task Instance 下游的 Task Instance。
- Recursive: 当此 Task Instance 为 sub DAG 时,循环清除所有该 sub DAG 中的 Task Instance。注意:若当此 Task Instance 不是 sub DAG 则忽略此选项。
- Mark Success 指令为讲当前 Task Instance 状态标记为 success。注意:如果该 Task Instance 的状态为 running,则会尝试 kill 该 Task Instance 所执行指令,并进入 shutdown 状态,并在 kill 完成后将此次执行标记为 failed(如果 retry 次数没有用完,将标记为 up_for_retry)。
4. DAG 配置
Airflow 中的 DAG 是由 Python 脚本来配置的,因而可扩展性非常强。Airflow 提供了一些 DAG 例子,
# -*- coding: utf-8 -*-
import airflow
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2) # 作业的开始时间,即作业将在这个时间点以后开始调度。
}
dag = DAG(
dag_id='example_bash_operator', # 给 DAG 取一个名字,不能重复
default_args=args,
schedule_interval='0 0 * * *' # 配置 DAG 的执行周期,语法和 crontab 的一致
)
cmd = 'ls -l'
run_this_last = DummyOperator(task_id='run_this_last', dag=dag)
run_this = BashOperator(
task_id='run_after_loop', bash_command='echo 1', dag=dag)
run_this.set_downstream(run_this_last)
for i in range(3):
i = str(i)
task = BashOperator(
task_id='runme_'+i,
bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
dag=dag)
task.set_downstream(run_this)
task = BashOperator(
task_id='also_run_this',
bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
dag=dag)
# 用[]可以并行执行多个job, task >> [job1,job2]
# 等同于 task.set_downstream(run_this_last)
task >> run_this_last
那么现在,让我们看一下当一个新配置的 DAG 生效后第一次调度会在什么时候。其实第一次调度时间是在作业中配置的 start date 的第二个满足 schedule interval 的时间点,并且记录的 execution date 为作业中配置的 start date 的第一个满足 schedule interval 的时间点.
假设我们配置了一个作业的 start date 为 2017年10月1日,配置的 schedule interval 为 **00 12 * * *** 那么第一次执行的时间将是 2017年10月2日 12点 而此时记录的 execution date 为 2017年10月1日 12点。因此 execution date 并不是如其字面说的表示执行时间,真正的执行时间是 execution date 所显示的时间的下一个满足 schedule interval 的时间点。