Saturday, April 16, 2016

Airflow - Beginners Tutorial

Airflow is a workflow engine from Airbnb. Airbnb developed it for its internal use and had recently open sourced it. In Airflow, the workflow is defined programmatically. Airflow document says that it's more maintainable to build workflows in this way, however I would leave it to the judgement of everyone. A developer, anyway would love anything programmatic. :) Airflow comes with a UI also and I can say that the UI is very clean and impressive. 

The main concept of airflow is a DAG (Directed Acyclic Graph). Yes, it's the same graph that you have seen in Maths, if you have seen it. A DAG contains vertices and directed edges. In a DAG, you can never reach to the same vertex, at which you have started, following the directed edges. Otherwise your workflow can get into an infinite loop. In workflow context, tasks can be defined as vertex and the sequence is represented with the directed edge. The sequence decides the order in which the tasks will be performed.

Make no mistake about the fact that airflow is just a workflow engine. It is only responsible for defining tasks and sequences. The details of task has to be handled by each task on its own. Airflow provides hooks for initiating tasks and has integration points to other systems. But at the end of the day, it's a workflow management system and no more than that. 

Oh!, one thing need to mention before we move ahead. Airflow is in Python and the workflows are also defined using Python. 

Installation

Airflow needs a home and we can give the home to any place. For this let's say we give it at home

export AIRFLOW_HOME=~/airflow

Install airflow

pip install airflow

Initialize database

airflow initdb

Start the webserver

airflow webserver -p 8080

Writing a DAG

Now let's write aworkflow in the form of a DAG. We will have four task t1, t2, t3 and t4. t4 will depend on t2 and t3. t2 and t3, in turn will depend on t1. Let's name the script helloWorld.py and put it in dags folder of airflow home. Create dags folder if it's not there.


from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta

# Following are defaults which can be overridden later on
default_args = {
    'owner': 'lalit.bhatt',
    'depends_on_past': False,
    'start_date': datetime(2016, 4, 15),
    'email': ['lalit.bhatt@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

dag = DAG('HelloWorld', default_args=default_args)

# t1, t2, t3 and t4 are examples of tasks created using operators

t1 = BashOperator(
    task_id='task_1',
    bash_command='echo "Hello World from Task 1"',
    dag=dag)

t2 = BashOperator(
    task_id='task_2',
    bash_command='echo "Hello World from Task 2"',
    dag=dag)

t3 = BashOperator(
    task_id='task_3',
    bash_command='echo "Hello World from Task 3"',
    dag=dag)

t4 = BashOperator(
    task_id='task_4',
    bash_command='echo "Hello World from Task 4"',
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)
t4.set_upstream(t2)
t4.set_upstream(t3)

Testing and Installing DAG

Make sure script compiles. Go to dags folder and run

python helloWorld.py

Some useful command lines

List all DAGs'

airflow list_dags   

List Tasks for the given DAG

airflow list_tasks HelloWorld

Backfilling

airflow backfill HelloWorld -s 2015-04-12 -e 2015-04-15

Individual tasks can be tested also

airflow test HelloWorld task_1 2016-04-15

For details see the following video



Referencehttps://github.com/airbnb/airflow . 

Also see the discussion at https://github.com/airbnb/airflow/issues/289  . This gives an idea about the possible future of airflow


4 comments:

  1. Hi Lalit,

    Thanks for posting it. It is very helpful. I have tried installing it by following your steps on RHEL 6.5 and 7 but getting error as:
    Python Version: Python 2.6.6(tried on 2.7)
    Operating System: RedHat 6.5(tried on RedHat 7.5)
    Python packages: argparse==1.2.1 aws-cfn-bootstrap==1.3 backports.ssl-match-hostname==3.4.0.2 boto==2.34.0 cas==0.15 chardet==2.0.1 Cheetah==2.4.1 cloud-init==0.7.2 configobj==4.6.0 ethtool==0.6 freeipa==2.0.0a0 iniparse==0.3.1 iotop==0.3.2 ipapython==3.0.0 iwlib==1.0 kerberos==1.1 lockfile==0.9.1 lxml==2.2.3 M2Crypto==0.20.2 Markdown==2.0.1 matplotlib==0.99.1.1 netaddr==0.7.5 nose==0.10.4 numpy==1.4.1 ordereddict==1.2 paramiko==1.7.5 policycoreutils-default-encoding==0.1 prettytable==0.6.1 pycparser==2.14 pycrypto==2.0.1 pycurl==7.19.0 Pygments==1.1.1 pygpgme==0.1 pyOpenSSL==0.10 pystache==0.5.4 python-daemon==1.5.5 python-dateutil==1.4.1 python-default-encoding==0.1 python-dmidecode==3.10.13 python-ldap==2.3.10 python-nss==0.16.0 pytz===2010h PyYAML==3.10 requests==1.1.0 rhnlib==2.5.22 rhsm==1.12.5 setools==1.0 simplejson==2.0.9 six==1.9.0 SSSDConfig==1.12.4 urlgrabber==3.9.1
    int exp (void);
    Any help would be appreciated.
    Thanks,
    Afroz Hussain


    ReplyDelete
    Replies
    1. Afroz - Looking at the text that you have copied my best guess is version incompatibilities. Are you getting some stack trace which can help in understanding issue better.

      Delete
  2. Hi Lalit,

    Thankyou for a very clear outlined post on Airflow. I have a question. I've installed airflow onto my server as per your instrustructions. The installation looks to contain both the master (server) and slave (client). How do you install the slave/client component of Airflow on another computer? Or does each installation need to contain both the server and client?

    Thanks
    Zenro

    ReplyDelete
  3. This comment has been removed by a blog administrator.

    ReplyDelete