Plongez au cœur de l'IA Générative avec notre livre blanc !

logo saagie red
illustration blog apache airflow

Comment Ordonnancer Facilement des Jobs avec Apache Airflow ?

Cet article s’adresse aussi bien aux débutants qu’aux vétérans d’Airflow et vise à présenter les objets fondamentaux de cette technologie ainsi que son interfaçage avec la solution d’orchestration Saagie. Nous n’allons pas vous ré-expliquer une énième fois comment créer un Graphe Acyclique Orienté (communément appelé DAG pour Directed Acyclic Graph) ou encore comment les planifier. En effet, il existe déjà de nombreux tutoriels / ressources sur ce sujet. Nous ferons juste un rappel concis sur les concepts que nous allons utiliser par la suite. Ici, nous allons voir comment interagir avec Saagie depuis Airflow.

Introduction à Apache Airflow

Airflow est une solution d’ordonnancement pour coder, planifier et superviser vos workflows de traitements. Désormais en incubation sous la fondation Apache, c’est l’entreprise Airbnb qui a développé ce projet pour leurs besoins internes avant de le rendre open-source en 2016. Depuis de nombreuses entreprises s’en servent pour gérer, ordonnancer ou encore superviser leurs workflows.

Airflow va donc vous permettre de créer vos workflows sous la forme d’un DAG (Graphe Acyclique Orienté). Vous pourrez définir un enchaînement d’opérations où certaines seront exécutées en séquence et d’autres en parallèle. Cependant, il n’existe pas d’interface interactive pour construire votre DAG, vous serez obligé de coder en Python pour configurer vos opérations et définir votre DAG. Pas d’inquiétude, très peu de connaissances dans ce langage seront nécessaires pour réaliser des workflows simples. Airflow vous propose aussi une interface web pour faciliter le monitoring de vos pipelines.

airflow gif

Les concepts

DAG – Directed Acyclic Graph

Comme dit précédemment, Airflow va vous permettre d’exécuter vos tâches en définissant leur ordre d’exécution. Vous pouvez rendre des tâches dépendantes d’autres pour ainsi créer une séquence. Il est aussi possible de configurer les tâches pour qu’elles soient exécutées en parallèle. Les DAG ne sont cependant là que pour organiser l’ordonnancement de vos tâches.

graph

Operators et Sensors

Ces tâches peuvent être de deux natures différentes:

  • Les operators vont exécuter une opération (par exemple, lancer un code Python, créer une instance sur EC2 sur Amazon ou encore lancer une requête HQL sur Hive). De nombreux operators sont déjà mis à disposition dans Airflow.
  • Les sensors ont « mettre en pause » l’exécution jusqu’à ce qu’une condition (ou des conditions) soit remplie(s). Cela peut être par exemple l’attente du dépôt d’un fichier sur HDFS ou tout simplement un timer. De la même manière, on retrouve un certain nombre de sensors dans Airflow.

Executors

Maintenant que les tâches sont définies, il nous faut pouvoir les exécuter. Actuellement, Airflow dispose de 3 types d’executors :

  • Le SequentialExecutor, comme son nom l’indique, va lancer les tâches les unes après les autres. Dans ce mode, impossible de lancer des tâches en parallèle.
  • Le LocalExecutor lancé sur une machine pourra exécuter plusieurs tâches en parallèle (scaling vertical).
  • Le CeleryExecutor va, quant à lui, nous permettre de scaler de manière horizontale. En effet, après avoir installé le « cluster » Celery, les tâches seront transmises aux workers qui se chargeront de les exécuter.

Il est important de noter qu’il est nécessaire d’avoir un serveur Airflow à disposition (le scheduler) dans lequel on y déposera nos DAG. C’est celui-ci qui transmettra à l’executor les tâches qu’il doit réaliser. Si vous êtes déjà utilisateur de Saagie, vous pouvez tout aussi bien « conteneuriser » AirFlow en chargeant une image Docker dans votre projet. Ainsi, vous bénéficierez à la fois des aptitudes d’orchestration de Saagie et d’ordonnancement d’AirFlow. Pour rappel, il est important de distinguer ces deux notions : l’ordonnancement et l’orchestration. L’ordonnancement est la planification de tâches sous forme de pipelines conditionnels (DAG) tandis que l’orchestration – selon Saagie – est un ensemble de 3 aptitudes : l’ordonnancement précédemment décrit, l’instanciation de conteneurs et l’allocation de ressources matérielles.

Comment utiliser Airflow avec Saagie ?

Commençons par parler des plugins d’Airflow. Utiliser les plugins est un moyen de créer un nouvel ensemble d’operators et de sensors, etc., pour interagir avec votre écosystème. C’est donc un moyen facile de créer et partager de nouvelles fonctionnalités.

Il important de noter que Saagie vous met à disposition une API REST pour interagir avec la solution. Vous pouvez ainsi mettre à jour, lancer vos jobs ou encore créer une nouvelle variable d’environnement. C’est donc en utilisant l’API que nous allons pouvoir créer nos propres operators et sensors Airflow.

Lien vers la documention de l’API

De notre côté, nous avons créé un wrapper Python pour appeler cette API ce qui simplifiera la création des operators et sensors.

Dans un premier temps, nous allons créer un operator qui lancera un job dans un environnement Saagie. Pour cela, on définit une nouvelle classe qui étend la classe BaseOperator et dans laquelle on doit implémenter la fonction execute. De plus, pour créer le plugin, nous allons définir une nouvelle class SaagiePlugin qui dérive de la class airflow.plugins_manager.AirflowPlugin. C’est dans cette dernière que nous allons faire référence aux objets que nous souhaitons inclure dans notre plugin. Une fois créés, ceux-ci devront être placés dans le répertoire $AIRFLOW_HOME/plugins.

Operator

import logging
import json

from airflow.models import BaseOperator
from airflow.operators.sensors import BaseSensorOperator
from airflow.plugins_manager import AirflowPlugin
from querySaagieApi import QuerySaagieApi
from airflow import AirflowException

log = logging.getLogger(__name__)


class SaagieLaunchOperator(BaseOperator):

    def __init__(self, user, password, url_saagie, id_platform, job_id, *args, **kwargs):
        super(SaagieLaunchOperator, self).__init__(*args, **kwargs)
        self.user = user
        self.password = password
        self.url_saagie = url_saagie
        self.id_platform = id_platform
        self.job_id = job_id
        self.qsa = QuerySaagieApi(url_saagie, id_platform, user, password)

    def execute(self, context):
        res = self.qsa.run_job(self.job_id)
        if res.status_code != 204:
            raise AirflowException("Did not manage to launch the job, please check your parameters.")
        log.info("Job %s is launched", self.job_id)

class SaagiePlugin(AirflowPlugin):
    name = "saagie_plugin"
    operators = [SaagieOperator]
    sensors = [SaagieSensor]

Maintenant que notre operator a été mis en place, nous souhaiterions connaître le statut d’un job et s’il passe en success de continuer et d’exécuter les tâches suivantes dans notre DAG. Pour cela, nous allons devoir mettre en place un sensor. Il suffit de créer une nouvelle classe qui étend BaseSensorOperator et d’implémenter la fonction poke (cette fonction est appelée de manière régulière, on peut choisir l’intervalle).

Sensor

class SaagieSensor(BaseSensorOperator):

    def __init__(self, user, password, url_saagie, id_platform, job_id, *args, **kwargs):
        super(SaagieSensor, self).__init__(*args, **kwargs)
        self.user = user
        self.password = password
        self.url_saagie = url_saagie
        self.id_platform = id_platform
        self.job_id = job_id
        self.qsa = QuerySaagieApi(url_saagie, id_platform, user, password)

    def poke(self, context):
        res = self.qsa.get_job_detail(self.job_id)

        if not res.status_code == 200:
            log.info('Bad HTTP response: %s', res.status_code)
            return False

        state = json.loads(res.text)['last_state']['state']
        last_task_status = json.loads(res.text)['last_state']['lastTaskStatus']
        log.info('Job currently %s', state)

        if state != 'STOPPED':
            return False
        elif last_task_status != 'SUCCESS':
            return AirflowException("Final status : %s", last_task_status)
        else:
            # Go to next operator
            return True
Il nous reste à organiser ces operator et sensor dans un DAG. Voici un exemple simple :
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.saagie_plugin import SaagieOperator, SaagieSensor

def print_hello():
    return "Hello world!"

dag = DAG('saagie_demo_dag', description='Example Airflow + Saagie',
          schedule_interval='0 12 * * *',
          start_date=datetime(2019, 9, 18), catchup=False, max_active_runs=1)

run_job = SaagieOperator(...)
sensor_job = SaagieSensor(...)
hello_operator = PythonOperator(...)

run_job >> sensor_job >> hello_operator
Celui-ci va d’abord lancer un job dans l’environnement Saagie, le sensor va ensuite vérifier l’état du job avant passer au dernier job (PythonOperator). Comme on l’a vu dans cet article, il est facile de créer une synergie entre un ordonnanceur comme Airflow et la solution d’orchestration Saagie. En effet, Saagie, en incluant une API pour manipuler les projets, les variables d’environnement mais surtout les jobs permettent d’automatiser un grand nombre de tâches. Bien que Saagie possède son propre ordonnanceur, il est tout à fait possible d’utiliser un ordonnanceur externe open source ou commercial.