This article is intended for both Airflow beginners and veterans and aims to present the fundamental objects of this technology as well as its interfacing with Saagie’s DataOps platform. We are not going to explain to you again how to create a Directed Acyclic Graph (commonly called DAG) or how to plan them. Indeed, there are already many tutorials / resources on this subject. We will just make a concise reminder of the concepts we will use later. Here we will see how to interact with Saagie from Airflow.
An introduction to Apache Airflow
Airflow is a scheduling solution to code, plan and supervise your processing workflows. Now in incubation under the Apache foundation, the company Airbnb developed this project for their internal needs before making it open-source in 2016. Since then, many companies have been using it to manage, schedule or supervise their workflows.
Airflow will therefore allow you to create your workflows in the form of a DAG (Directed Oriented Graph). You will be able to define a sequence of operations where some will be executed in sequence and others in parallel. However, there is no interactive interface to build your DAG, you will have to code in Python to configure your operations and define your DAG. Don’t worry, very little knowledge in this language will be required to build simple workflows. Airflow also offers you a web interface to facilitate the monitoring of your pipelines.
DAG – Directed Acyclic Graph
As said before, Airflow will allow you to perform your tasks by defining their order of execution. You can make tasks dependent on each other to create a sequence. It is also possible to configure tasks to run in parallel. However, the DAGs are only there to organize the scheduling of your tasks.
Operators and Sensors
These tasks can be of two different kinds:
- Operators will execute an operation (for example, launch a Python code, create an instance on EC2 on Amazon or launch a HQL request on Hive). Many operators are already available in Airflow.
- Sensors have “paused” execution until a condition (or conditions) is met. This can be, for example, waiting for a file to be uploaded to HDFS or simply a timer. In the same way, we find a number of sensors in Airflow.
Now that the tasks are defined, we need to be able to execute them. Currently, Airflow has 3 types of executors:
- The SequentialExecutor, as its name indicates, will launch the tasks one after the other. In this mode, it is impossible to launch tasks in parallel.
- The LocalExecutor launched on a machine will be able to run several tasks in parallel (vertical scaling).
- The CeleryExecutor will allow us to scroll horizontally. Indeed, after installing the Celery cluster, the tasks will be transmitted to the workers who will be in charge of executing them.
It is important to note that it is necessary to have an Airflow server available (the scheduler) in which we will place our DAGs. This is the one that will transmit to the executor the tasks he has to perform. If you are already a Saagie user, you can also “containerize” AirFlow by loading a Docker image in your project. In this way, you will benefit from both Saagie’s orchestration and AirFlow scheduling capabilities. As a reminder, it is important to distinguish between these two notions: scheduling and orchestration. Scheduling is the scheduling of tasks in the form of conditional pipelines (DAG) while orchestration – according to Saagie – is a set of 3 skills: the scheduling described above, the instantiation of containers and the allocation of material resources.
How to use Airflow with Saagie?
Let’s start by talking about the Airflow plugins. Using plugins is a way to create a new set of operators and sensors, etc., to interact with your ecosystem. So it’s an easy way to create and share new features.
It is important to note that Saagie provides you with a REST API to interact with the solution. You can update, launch your jobs or create a new environment variable. So it is by using the API that we will be able to create our own Airflow operators and sensors.
On our side, we have created a Python wrapper to call this API which will simplify the creation of operators and sensors.
First, we will create an operator that will run a job in a Saagie environment. To do this, we define a new class that extends the BaseOperator class and in which we must implement the execute function. In addition, to create the plugin, we are going to define a new SaagiePlugin class that derives from the class airflow.plugins_manager.AirflowPlugin. It is in the latter that we will refer to the objects we want to include in our plugin. Once created, they will have to be placed in the $AIRFLOW_HOME/plugins directory.
import logging import json from airflow.models import BaseOperator from airflow.operators.sensors import BaseSensorOperator from airflow.plugins_manager import AirflowPlugin from querySaagieApi import QuerySaagieApi from airflow import AirflowException log = logging.getLogger(__name__) class SaagieLaunchOperator(BaseOperator): def __init__(self, user, password, url_saagie, id_platform, job_id, *args, **kwargs): super(SaagieLaunchOperator, self).__init__(*args, **kwargs) self.user = user self.password = password self.url_saagie = url_saagie self.id_platform = id_platform self.job_id = job_id self.qsa = QuerySaagieApi(url_saagie, id_platform, user, password) def execute(self, context): res = self.qsa.run_job(self.job_id) if res.status_code != 204: raise AirflowException("Did not manage to launch the job, please check your parameters.") log.info("Job %s is launched", self.job_id) class SaagiePlugin(AirflowPlugin): name = "saagie_plugin" operators = [SaagieOperator] sensors = [SaagieSensor]
Now that our operator has been set up, we would like to know the status of a job and if it is successful to continue and execute the following tasks in our DAG. For this, we will have to set up a sensor. Just create a new class that extends BaseSensorOperator and implement the poke function (this function is called on a regular basis, we can choose the interval).
class SaagieSensor(BaseSensorOperator): def __init__(self, user, password, url_saagie, id_platform, job_id, *args, **kwargs): super(SaagieSensor, self).__init__(*args, **kwargs) self.user = user self.password = password self.url_saagie = url_saagie self.id_platform = id_platform self.job_id = job_id self.qsa = QuerySaagieApi(url_saagie, id_platform, user, password) def poke(self, context): res = self.qsa.get_job_detail(self.job_id) if not res.status_code == 200: log.info('Bad HTTP response: %s', res.status_code) return False state = json.loads(res.text)['last_state']['state'] last_task_status = json.loads(res.text)['last_state']['lastTaskStatus'] log.info('Job currently %s', state) if state != 'STOPPED': return False elif last_task_status != 'SUCCESS': return AirflowException("Final status : %s", last_task_status) else: # Go to next operator return True
We still have to organize these operators and sensors in a DAG. Here is a simple example:
from datetime import datetime from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.operators.saagie_plugin import SaagieOperator, SaagieSensor def print_hello(): return "Hello world!" dag = DAG('saagie_demo_dag', description='Example Airflow + Saagie', schedule_interval='0 12 * * *', start_date=datetime(2019, 9, 18), catchup=False, max_active_runs=1) run_job = SaagieOperator(...) sensor_job = SaagieSensor(...) hello_operator = PythonOperator(...) run_job >> sensor_job >> hello_operator
The sensor will first run a job in the Saagie environment, the sensor will then check the status of the job before moving on to the last job (PythonOperator).
As we have seen in this article, it is easy to create a synergy between a scheduler like Airflow and the Saagie orchestration solution. Indeed, Saagie, by including an API to manipulate projects, environment variables but especially jobs, allows to automate a large number of tasks. Although Saagie has its own scheduler, it is quite possible to use an external open source or commercial scheduler.