本文将介绍 Airflow 这一款优秀的调度工具。主要包括 Airflow 的服务构成、Airflow 的 Web 界面、DAG 配置、常用配置以及 Airflow DAG Creation Manager Plugin 这一款 Airflow 插件。


一、什么是 Airflow

Airflow 是 Airbnb 开源的一个用 Python 编写的调度工具。于 2014 年启动,2015 年春季开源,2016 年加入 Apache 软件基金会的孵化计划。

Airflow 通过 DAG 也即是有向非循环图来定义整个工作流,因而具有非常强大的表达能力。


如上图所示,一个工作流可以用一个 DAG 来表示,在 DAG 中将完整得记录整个工作流中每个作业之间的依赖关系、条件分支等内容,并可以记录运行状态。通过 DAG,我们可以精准的得到各个作业之间的依赖关系。

在进一步介绍 Airflow 之前,我想先介绍一些在 Airflow 中常见的名词概念:

  • DAG

    DAG 意为有向无循环图,在 Airflow 中则定义了整个完整的作业。同一个 DAG 中的所有 Task 拥有相同的调度时间。

  • Task

    Task 为 DAG 中具体的作业任务,它必须存在于某一个 DAG 之中。Task 在 DAG 中配置依赖关系,跨 DAG 的依赖是可行的,但是并不推荐。跨 DAG 依赖会导致 DAG 图的直观性降低,并给依赖管理带来麻烦。

  • DAG Run

    当一个 DAG 满足它的调度时间,或者被外部触发时,就会产生一个 DAG Run。可以理解为由 DAG 实例化的实例。

  • Task Instance

    当一个 Task 被调度启动时,就会产生一个 Task Instance。可以理解为由 Task 实例化的实例。

二、Airflow 的服务构成

一个正常运行的 Airflow 系统一般由以下几个服务构成

  • WebServer

    Airflow 提供了一个可视化的 Web 界面。启动 WebServer 后,就可以在 Web 界面上查看定义好的 DAG 并监控及改变运行状况。也可以在 Web 界面中对一些变量进行配置。

  • Worker

    一般来说我们用 Celery Worker 来执行具体的作业。Worker 可以部署在多台机器上,并可以分别设置接收的队列。当接收的队列中有作业任务时,Worker 就会接收这个作业任务,并开始执行。Airflow 会自动在每个部署 Worker 的机器上同时部署一个 Serve Logs 服务,这样我们就可以在 Web 界面上方便的浏览分散在不同机器上的作业日志了。

  • Scheduler

    整个 Airflow 的调度由 Scheduler 负责发起,每隔一段时间 Scheduler 就会检查所有定义完成的 DAG 和定义在其中的作业,如果有符合运行条件的作业,Scheduler 就会发起相应的作业任务以供 Worker 接收。

  • Flower

    Flower 提供了一个可视化界面以监控所有 Celery Worker 的运行状况。这个服务并不是必要的。


三、Airflow 的 Web 界面

下面简单介绍一下 Airflow 的 Web 操作界面,从而可以对 Airflow 有一个更直观的了解。

1、DAG 列表

  1. 左侧 On/Off 按钮控制 DAG 的运行状态,Off 为暂停状态,On 为运行状态。注意:所有 DAG 脚本初次部署完成时均为 Off 状态。

  2. 若 DAG 名称处于不可点击状态,可能为 DAG 被删除或未载入。若 DAG 未载入,可点击右侧刷新按钮进行刷新。注意:由于可以部署若干 WebServer,所以单次刷新可能无法刷新所有 WebServer 缓存,可以尝试多次刷新。

  3. Recent Tasks 会显示最近一次 DAG Run(可以理解为 DAG 的执行记录)中 Task Instances(可以理解为作业的执行记录)的运行状态,如果 DAG Run 的状态为 running,此时显示最近完成的一次以及正在运行的 DAG Run 中所有 Task Instances 的状态。

  4. Last Run 显示最近一次的 execution date。注意:execution date 并不是真实执行时间,具体细节在下文 DAG 配置中详述。将鼠标移至 execution date 右侧 info 标记上,会显示 start date,start date 为真实运行时间。start date 一般为 execution date 所对应的下次执行时间。


2、作业操作框

在 DAG 的树状图和 DAG 图中都可以点击对应的 Task Instance 以弹出 Task Instance 模态框,以进行 Task Instance 的相关操作。注意:选择的 Task Instance 为对应 DAG Run 中的 Task Instance。


  1. 在作业名字的右边有一个漏斗符号,点击后整个 DAG 的界面将只显示该作业及该作业的依赖作业。当该作业所处的 DAG 较大时,此功能有较大的帮助。

  2. Task Instance Details 显示该 Task Instance 的详情,可以从中得知该 Task Instance 的当前状态,以及处于当前状态的原因。例如,若该 Task Instance 为 no status 状态,迟迟不进入 queued 及 running 状态,此时就可通过 Task Instance Details 中的 Dependency 及 Reason 得知原因。

  3. Rendered 显示该 Task Instance 被渲染后的命令。

  4. Run 指令可以直接执行当前作业。

  5. Clear 指令为清除当前 Task Instance 状态,清除任意一个 Task Instance 都会使当前 DAG Run 的状态变更为 running。注意:如果被清除的 Task Instance 的状态为 running,则会尝试 kill 该 Task Instance 所执行指令,并进入 shutdown 状态,并在 kill 完成后将此次执行标记为 failed(如果 retry 次数没有用完,将标记为 up_for_retry)。Clear 有额外的5个选项,均为多选,这些选项从左到右依次为:

    • Past: 同时清除所有过去的 DAG Run 中此 Task Instance 所对应的 Task Instance。

    • Future: 同时清除所有未来的 DAG Run 中此 Task Instance 所对应的 Task Instance。注意:仅清除已生成的 DAG Run 中的 Task Instance。

    • Upstream: 同时清除该 DAG Run 中所有此 Task Instance 上游的 Task Instance。

    • Downstream: 同时清除该 DAG Run 中所有此 Task Instance 下游的 Task Instance。

    • Recursive: 当此 Task Instance 为 sub DAG 时,循环清除所有该 sub DAG 中的 Task Instance。注意:若当此 Task Instance 不是 sub DAG 则忽略此选项。

  6. Mark Success 指令为讲当前 Task Instance 状态标记为 success。注意:如果该 Task Instance 的状态为 running,则会尝试 kill 该 Task Instance 所执行指令,并进入 shutdown 状态,并在 kill 完成后将此次执行标记为 failed(如果 retry 次数没有用完,将标记为 up_for_retry)。


四、DAG 配置

Airflow 中的 DAG 是由 Python 脚本来配置的,因而可扩展性非常强。Airflow 提供了一些 DAG 例子,我们可以通过一个例子来简单得了解一下。

# -*- coding: utf-8 -*-

import airflow
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG


args = {
   'owner': 'airflow',
   'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
   dag_id='example_bash_operator', default_args=args,
   schedule_interval='0 0 * * *')

cmd = 'ls -l'
run_this_last = DummyOperator(task_id='run_this_last', dag=dag)

run_this = BashOperator(
   task_id='run_after_loop', bash_command='echo 1', dag=dag)
run_this.set_downstream(run_this_last)

for i in range(3):
   i = str(i)
   task = BashOperator(
       task_id='runme_'+i,
       bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
       dag=dag)
   task.set_downstream(run_this)

task = BashOperator(
   task_id='also_run_this',
   bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
   dag=dag)
task.set_downstream(run_this_last)

我们可以看到,整个 DAG 的配置就是一份完整的 Python 代码,在代码中实例化 DAG,实例化适合的 Operator,并通过 set_downstream 等方法配置上下游依赖关系。下面我们简单看一下在 DAG 配置中的几个重要概念。

  • DAG

    要配置一个 DAG 自然需要一个 DAG 实例。在同一个 DAG 下的所有作业,都需要将它的 dag 属性设置为这个 DAG 实例。在实例化 DAG 时,通过传参数可以给这个 DAG 实例做一些必要的配置。

    • dag_id

      给 DAG 取一个名字,方便日后维护。

    • default_args

      默认参数,当属于这个 DAG 实例的作业没有配置相应参数时,将使用 DAG 实例的 default_args 中的相应参数。

    • schedule_interval

      配置 DAG 的执行周期,语法和 crontab 的一致。

  • 作业 (Task)

    Airflow 提供了很多 Operator,我们也可以自行编写新的 Operator。在本例中使用了 2 种 Operator,DummyOperator 什么都不会做, BashOperator 则会执行 bash_command 参数所指定的 bash 指令,并且使用 jinja2 模版引擎,对该指令进行渲染,因而在本例的 bash_command 中,可以看到一些需要渲染的变量。当 Operator 被实例化后,我们称之为相应 DAG 的一个作业(Task)。在实例化 Operator 时,同样可以通过穿参数进行必要的配置,值得注意的是,如果在 DAG 中有设置 default_args 而在 Operator 中没有覆盖相应配置,则会使用 default_args 中的配置。

    • dag

      传递一个 DAG 实例,以使当前作业属于相应 DAG。

    • task_id

      给作业去一个名字,方便日后维护。

    • owner

      作业的拥有者,方便作业维护。另外有些 Operator 会根据该参数实现相应的权限控制。

    • start_date

      作业的开始时间,即作业将在这个时间点以后开始调度。

  • 依赖

    配置以来的方法有两种,除了可以使用作业实例的 set_upstream 和 set_downstream 方法外,还可以使用类似

    task1 << task2 << task3
    task3 >> task4

    这样更直观的语法来设置。

这里我们要特别注意一个关于调度执行时间的问题。在谈这个问题前,我们先确定几个名词:

  • start date: 在配置中,它是作业开始调度时间。而在谈论执行状况时,它是调度开始时间。

  • schedule interval: 调度执行周期。

  • execution date: 执行时间,在 Airflow 中称之为执行时间,但其实它并不是真实的执行时间。

那么现在,让我们看一下当一个新配置的 DAG 生效后第一次调度会在什么时候。很多人会很自然的认为,第一次的调度时间当然是在作业中配置的 start date,但其实并不是。第一次调度时间是在作业中配置的 start date 的第二个满足 schedule interval 的时间点,并且记录的 execution date 为作业中配置的 start date 的第一个满足 schedule interval 的时间点。听起来很绕,让我们来举个例子。

假设我们配置了一个作业的 start date 为 2017年10月1日,配置的 schedule interval 为 00 12 * * * 那么第一次执行的时间将是 2017年10月2日 12点 而此时记录的 execution date 为 2017年10月1日 12点。因此 execution date 并不是如其字面说的表示执行时间,真正的执行时间是 execution date 所显示的时间的下一个满足 schedule interval 的时间点。

另外,当作业已经执行过之后,start date 的配置将不会再生效,这个作业的调度开始时间将直接按照上次调度所对应的 execution date 来计算。

这个例子只是简要的介绍了一下 DAG 的配置,也只介绍了非常少量的配置参数。Airflow 为 DAG 和作业提供了大量的可配置参数,详情可以参考 Airflow 官方文档。


五、常用配置

在日常工作中,有时候仅仅靠配置作业依赖和调度执行周期并不能满足一些复杂的需求。接下来将介绍一些常用的作业配置。

1、跳过非最新 DAG Run

假如有一个每小时调度的 DAG 出错了,我们把它的调度暂停,之后花了3个小时修复了它,修复完成后重新启动这个作业的调度。于是 Airflow 一下子创建了 3 个 DAG Run 并同时执行,这显然不是我们希望的,我们希望它只执行最新的 DAG Run。

我们可以创建一个 Short Circuit Operator,并且让 DAG 中所有没有依赖的作业都依赖这个作业,然后在这个作业中进行判断,检测当前 DAG Run 是否为最新,不是最新的直接跳过整个 DAG。

def skip_dag_not_latest_worker(ds, **context):
   if context['dag_run'] and context['dag_run'].external_trigger:
       logging.info('Externally triggered DAG_Run: allowing execution to proceed.')
       return True

   skip = False
   now = datetime.now()
   left_window = context['dag'].following_schedule(context['execution_date'])
   right_window = context['dag'].following_schedule(left_window)
   logging.info('Checking latest only with left_window: %s right_window: %s now: %s', left_window, right_window, now)

   if not left_window < now <= right_window:
       skip = True
   return not skip

ShortCircuitOperator(
   task_id='skip_dag_not_latest',
   provide_context=True,
   python_callable=skip_dag_not_latest_worker,
   dag=dag
)

2、当存在正在执行的 DAG Run 时跳过当前 DAG Run

依旧是之前提到的每小时调度的 DAG,假设它这次没有出错而是由于资源、网络或者其他问题导致执行时间变长,当下一个调度时间开始时 Airflow 依旧会启动一次新的 DAG Run,这样就会同时出现 2 个 DAG Run。如果我们想要避免这种情况,一个简单的方法是直接将 DAG 的 max_active_runs 设置为 1。但这样会导致 DAG Run 堆积的问题,如果你配置的调度是早上 9 点至晚上 9 点,直至晚上 9 点之后 Airflow 可能依旧在处理堆积的 DAG Run。这样就可能影响到我们原本安排在晚上 9 点之后的任务。

我们可以创建一个 Short Circuit Operator,并且让 DAG 中所有没有依赖的作业都依赖这个作业,然后在这个作业中进行判断,检测当前是否存在正在执行的 DAG Run,存在时则直接跳过整个 DAG。

def skip_dag_when_previous_running_worker(ds, **context):
   if context['dag_run'] and context['dag_run'].external_trigger:
       logging.info('Externally triggered DAG_Run: allowing execution to proceed.')
       return True

   skip = False
   session = settings.Session()
   count = session.query(DagRun).filter(
       DagRun.dag_id == context['dag'].dag_id,
       DagRun.state.in_(['running']),
   ).count()
   session.close()
   logging.info('Checking running DAG count: %s' % count)
   skip = count > 1
   return not skip

ShortCircuitOperator(
   task_id='skip_dag_when_previous_running',
   provide_context=True,
   python_callable=skip_dag_when_previous_running_worker,
   dag=dag
)


3、Sensor 的替代方案

Airflow 中有一类 Operator 被称为 Sensor,Sensor 可以感应预先设定的条件是否满足(如:某个时间点是否达到、某条 MySQL 记录是否被更新、某个 DAG 是否完成),当满足条件后 Sensor 作业变为 Success 使得下游的作业能够执行。Sensor 的功能很强大但却带来一个问题,假如我们有一个 Sensor 用于检测某个 MySQL 记录是否被更新,在 Sensor 作业启动后 3 个小时这个 MySQL 记录才被更新。于是我们的这个 Sensor 占用了一个 Worker 整整 3 小时,这显然是一个极大的浪费。

因此我们需要一个 Sensor 的替代方案,既能满足 Sensor 原来的功能,又能节省 Worker 资源。有一个办法是不使用 Sensor,直接使用 Python Operator 判断预先设定的条件是否满足,如果不满足直接 raise Exception,然后将这个作业的 retry_delay(重试间隔时间) 设为每次检测的间隔时间,retries(重试次数) 设为最长检测时间除以 retry_delay,即满足:最长检测时间 = retries * retry_delay。这样既不会长时间占用 Worker 资源,又可以满足 Sensor 原来的功能。


六、Airflow DAG Creation Manager Plugin

正如上两章所描述的,Airflow 虽然具有强大的功能,但是配置 DAG 并不是简单的工作,也有一些较为繁琐的概念,对于业务人员来说可能略显复杂。因此,笔者编写了 Airflow DAG Creation Manager Plugin(https://github.com/lattebank/airflow-dag-creation-manager-plugin)以提供一个 Web界面来让业务人员可视化的编写及管理 DAG。具体的安装及使用方法请查看插件的README。


如上图所示,插件的 Web 界面中可以直接所见即所得的编写 DAG 图。

插件中尽量简化了一些繁琐的诸如上文所述的作业开始调度时间等一系列的概念,并提供了一些在实际工作中常常会用到的一些额外的功能(如上文提到的跳过非最新 DAG Run、当存在正在执行的 DAG Run 时跳过当前 DAG Run 等),以及版本控制和权限管理。如果大家在使用 Airflow 的过程中也有类似的问题,欢迎尝试使用 Airflow DAG Creation Manager Plugin。


七、总结

Airflow 适用于调度作业较为复杂,特别是各作业之间的依赖关系复杂的情况。

希望本文能让大家对 Airflow 有所了解,并能将 Airflow 运用到适合它使用的场景中。



本文作者:杨涵冰   

文章来源:澳门威斯尼斯人技术团队公众号【泛金融技术】,欢迎关注、投稿、交流。

转载声明:未经授权不得转载,授权后转载请注明出处并附上原文链接。