调度系统-Airflow

调度系统的选型

现在的开源调度系统分为两类:

  • 以 定时类调度系统,设计核心是定时运行、数据分片和弹性扩容,但是对依赖关系支持的不太友好,更适用于后端业务开发,其代表为 XXL-JOBElastic-Job

  • 以 DAG 为核心的工作流调度系统,强调的是任务的依赖关系, 常见有:

    1. Oozie:Oozie 是基于 XML 格式进行开发的,后续集成到 Hue 里可以可视化配置,但是缺点也很明显,版本管理、日志收集都不太友好,开发灵活性很差,可调度的任务也很少,另外定义过于复杂,维护成本很高。当然最核心还是没有共用变量和共用连接信息的概念。
    2. Azkaban:和 Oozie 差不多,最核心问题还是没有共用变量和共用连接信息的概念
    3. Airflow
    4. Dolphinscheduler: 刚开源不久的 Apache 孵化项目,国人开发和贡献的

Airflow基本使用

基本概念

  • DAG :意为有向无循环图,在 Airflow 中则定义了整个完整的作业。同一个 DAG 中的所有 Task 拥有相同的调度时间。

  • Task: 为 DAG 中具体的作业任务,它必须存在于某一个 DAG 之中。Task 在 DAG 中配置依赖关系,跨 DAG 的依赖是可行的,但是并不推荐。跨 DAG 依赖会导致 DAG 图的直观性降低,并给依赖管理带来麻烦。

  • DAG Run: 当一个 DAG 满足它的调度时间,或者被外部触发时,就会产生一个 DAG Run。可以理解为由 DAG 实例化的实例。

  • Task Instance:当一个 Task 被调度启动时,就会产生一个 Task Instance。可以理解为由 Task 实例化的实例

基本架构

Airflow 是建立在元数据库上的队列系统。数据库存储队列任务的状态,调度器使用这些状态来确定如何将其它任务添加到队列的优先级。此功能由四个主要组件编排:

  • 元数据库:这个数据库存储有关任务状态的信息。数据库使用在 SQLAlchemy 中实现的抽象层执行更新。该抽象层将 Airflow 剩余组件功能从数据库中干净地分离了出来。

  • 调度器:调度器是一种使用 DAG 定义结合元数据中的任务状态来决定哪些任务需要被执行以及任务执行优先级的过程。调度器通常作为服务运行。

  • 执行器:Excutor 是一个消息队列进程,它被绑定到调度器中,用于确定实际执行每个任务计划的工作进程。有不同类型的执行器,每个执行器都使用一个指定工作进程的类来执行任务。例如,LocalExecutor 使用与调度器进程在同一台机器上运行的并行进程执行任务。其他像 CeleryExecutor 的执行器使用存在于独立的工作机器集群中的工作进程执行任务。

  • Workers:这些是实际执行任务逻辑的进程,由正在使用的执行器确定。

Airflow 的操作建立于存储任务状态和工作流的元数据库之上(即 DAG)。调度器和执行器将任务发送至队列,让 Worker 进程执行。WebServer 运行(经常与调度器在同一台机器上运行)并与数据库通信,在 Web UI 中呈现任务状态和任务执行日志。每个有色框表明每个组件都可以独立于其他组件存在,这取决于部署配置的类型

安装部署

  1. pip安装: pip install apache-airflow

  2. 修改环境变量新建目录指定为AIRFLOW_HOME

  3. 首次执行airflow命令会在AIRFLOW_HOME目录下初始化生成airflow.cfg文件

    1
    [root@server ~]# airflow
  4. mysql新建数据库airflow并配置权限

    1
    2
    mysql > create database airflow default character set utf8;
    mysql > grant all on *.* to airflow@localhost identified by 'airflow_test';
  5. 初始化数据库airflow

    1
    [root@server ~]# airflow initdb
  6. 修改airflow.cfg

    1
    2
    3
    4
    web_server_host = IP
    web_server_port = HOST
    executor = LocalExecutor
    sql_alchemy_conn = mysql数据库地址
  7. 启动守护进程(利用supervisord后台挂起)

    1
    2
    [root@server ~]# airflow webserver
    [root@server ~]# airflow sheduler

基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
dag = DAG(
'test',
default_args=default_args,
description='my first DAG',
schedule_interval='50 * * * *')
)

# examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)

t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
dag=dag,
)

t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag,
)

# Setting up Dependencies
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

踩坑

时区问题

系统启动的时间,以及写入数据库元数据的时间确是可以配置时区的, 可以通过配置 airflow 的配置文件 airflow.cfg 改变这个时区. 但是在airflow 提供的 webserver 上, 即web界面上我们看到的时间默认都是 UTC+0 的时间。因此就会存在实际调度时间与web页面显示的不一样,解决方案可以参考一下这篇博客airflow 修改中国时区(改airflow源码)

Backfill

Backfill,大致来讲就是在某些时候因为服务器宕机或者其他原因导致有些任务没有调度,然后重启后 airflow 自动回补执行去调度这些任务,可以使用手动方法来执行这个行为:

1
[root@server ~]# airflow backfill sensors -s 2015-06-01 -e 2015-06-07

可以看做一个类似断点恢复机制,其实是一个很好的功能,但在自动触发的时候,因为airflow会默认去回补执行从系统当前时间到我们指定的 start_date (上面配置 dag 的时候指定的一个参数)期间的任务,那么在有些时候就会出现不可预期的问题.
举个例子:
我设置了这样一批任务,每半小时执行一次调度,然后因为服务器宕机一段时间了,恢复后 airflow 自动触发了 Backfill,系统当前时间到我们指定的 start_date时间段还比较长,堆积了很多任务,然后开始回补执行,半个小时我还没回补执行完第一批任务,然后因为间隔半小时我又有一批新任务加入了进来,可能最后就会因为任务堆积的原因导致跑任务的服务器出现问题

并发调度问题

某次因为scheduler 进程挂掉了,重启起来发现全部任务被一块执行了。甚至每个 dag 多天没有跑完的任务直接起来,导致服务器一下压力太大。
修改全局文件 airflow.cfg 的参数降低并发:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32

# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16

# Are DAGs paused by default at creation
dags_are_paused_at_creation = True

# When not using pools, tasks are run in the "default pool",
# whose size is guided by this config element
non_pooled_task_slot_count = 128

# The maximum number of active DAG runs per DAG
max_active_runs_per_dag = 16

补充

airflow 的守护进程工作机制:

  • 调度器 scheduler 会间隔性的去轮询元数据库(Metastore)已注册的 DAG(有向无环图,可理解为作业流)是否需要被执行。如果一个具体的 DAG 根据其调度计划需要被执行,scheduler 守护进程就会先在元数据库创建一个 DagRun 的实例,并触发 DAG 内部的具体 task(任务,可以这样理解:DAG 包含一个或多个task),触发其实并不是真正的去执行任务,而是推送 task 消息至消息队列(即 broker)中,每一个 task 消息都包含此 task 的 DAG ID,task ID,及具体需要被执行的函数。如果 task 是要执行 bash 脚本,那么 task 消息还会包含 bash 脚本的代码。

  • 用户可能在 webserver 上来控制 DAG,比如手动触发一个 DAG 去执行。当用户这样做的时候,一个DagRun 的实例将在元数据库被创建,scheduler 使同 #1 一样的方法去触发 DAG 中具体的 task 。

  • worker 守护进程将会监听消息队列,如果有消息就从消息队列中取出消息,当取出任务消息时,它会更新元数据中的 DagRun 实例的状态为正在运行,并尝试执行 DAG 中的 task,如果 DAG 执行成功,则更新任 DagRun 实例的状态为成功,否则更新状态为失败

参考链接