Saturday, April 16, 2016

Airflow - Beginners Tutorial

Airflow is a workflow engine from Airbnb. Airbnb developed it for its internal use and had recently open sourced it. In Airflow, the workflow is defined programmatically. Airflow document says that it's more maintainable to build workflows in this way, however I would leave it to the judgement of everyone. A developer, anyway would love anything programmatic. :) Airflow comes with a UI also and I can say that the UI is very clean and impressive. 

The main concept of airflow is a DAG (Directed Acyclic Graph). Yes, it's the same graph that you have seen in Maths, if you have seen it. A DAG contains vertices and directed edges. In a DAG, you can never reach to the same vertex, at which you have started, following the directed edges. Otherwise your workflow can get into an infinite loop. In workflow context, tasks can be defined as vertex and the sequence is represented with the directed edge. The sequence decides the order in which the tasks will be performed.

Make no mistake about the fact that airflow is just a workflow engine. It is only responsible for defining tasks and sequences. The details of task has to be handled by each task on its own. Airflow provides hooks for initiating tasks and has integration points to other systems. But at the end of the day, it's a workflow management system and no more than that. 

Oh!, one thing need to mention before we move ahead. Airflow is in Python and the workflows are also defined using Python. 

Installation

Airflow needs a home and we can give the home to any place. For this let's say we give it at home

export AIRFLOW_HOME=~/airflow

Install airflow

pip install airflow

Initialize database

airflow initdb

Start the webserver

airflow webserver -p 8080

Writing a DAG

Now let's write aworkflow in the form of a DAG. We will have four task t1, t2, t3 and t4. t4 will depend on t2 and t3. t2 and t3, in turn will depend on t1. Let's name the script helloWorld.py and put it in dags folder of airflow home. Create dags folder if it's not there.


from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta

# Following are defaults which can be overridden later on
default_args = {
    'owner': 'lalit.bhatt',
    'depends_on_past': False,
    'start_date': datetime(2016, 4, 15),
    'email': ['lalit.bhatt@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

dag = DAG('HelloWorld', default_args=default_args)

# t1, t2, t3 and t4 are examples of tasks created using operators

t1 = BashOperator(
    task_id='task_1',
    bash_command='echo "Hello World from Task 1"',
    dag=dag)

t2 = BashOperator(
    task_id='task_2',
    bash_command='echo "Hello World from Task 2"',
    dag=dag)

t3 = BashOperator(
    task_id='task_3',
    bash_command='echo "Hello World from Task 3"',
    dag=dag)

t4 = BashOperator(
    task_id='task_4',
    bash_command='echo "Hello World from Task 4"',
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)
t4.set_upstream(t2)
t4.set_upstream(t3)

Testing and Installing DAG

Make sure script compiles. Go to dags folder and run

python helloWorld.py

Some useful command lines

List all DAGs'

airflow list_dags   

List Tasks for the given DAG

airflow list_tasks HelloWorld

Backfilling

airflow backfill HelloWorld -s 2015-04-12 -e 2015-04-15

Individual tasks can be tested also

airflow test HelloWorld task_1 2016-04-15

For details see the following video



Referencehttps://github.com/airbnb/airflow . 

Also see the discussion at https://github.com/airbnb/airflow/issues/289  . This gives an idea about the possible future of airflow


14 comments:

  1. Afroz - Looking at the text that you have copied my best guess is version incompatibilities. Are you getting some stack trace which can help in understanding issue better.

    ReplyDelete
  2. Hi Lalit,

    Thankyou for a very clear outlined post on Airflow. I have a question. I've installed airflow onto my server as per your instrustructions. The installation looks to contain both the master (server) and slave (client). How do you install the slave/client component of Airflow on another computer? Or does each installation need to contain both the server and client?

    Thanks
    Zenro

    ReplyDelete
  3. This comment has been removed by a blog administrator.

    ReplyDelete
  4. Hi Lalit,

    I followed you video but the created DAG is listed for `airflow list_dags` but not in the UI.

    Tried browser refresh still the same.

    Its getting listed in UI only when i stop and start the webserver(airflow webserver -p 8080).


    Please help me to figure this out.

    _Vinish

    ReplyDelete
  5. Hi Lalit,

    I followed you video to create DAG but the created dag is not getting listed in the UI.

    Tried browser refresh still the same.

    Its getting listed for `airflow list_dags` but not in the UI unless the webserver(airflow webserver -p 8080) is stopped and started again.


    _Vinish

    ReplyDelete
  6. hi,
    Thank you for this example. It helped clear out the basics. Can you explain how to run python functions as tasks in airflow?
    Thanks

    ReplyDelete
  7. Hi,
    Thank you for this example. It helped clear out the basics. I would like to know, how to run a python function as a task.

    Thanks

    ReplyDelete
  8. Hi,
    How can i make oracle as backend for airflow. I tried creating a connection and i think it created sucessfully. When i am trying to perform an adhoc query under data profiling i dont see my connection. ANy help Thanks!

    ReplyDelete
    Replies
    1. It's been a while I have not used airflow so would be difficult for me to help out.

      Delete
  9. While installing airflow it shows following error
    error: Microsoft Visual C++ 14.0 is required. Get it with "Microsoft Visual C++
    Build Tools": http://landinghub.visualstudio.com/visual-cpp-build-tools

    What should I need to do?
    Do I need to install Microsoft Visual C++ 14.0 or is there any other alternate?
    please help me in this

    ReplyDelete
  10. While installing airflow it shows following error
    error: Microsoft Visual C++ 14.0 is required. Get it with "Microsoft Visual C++
    Build Tools": http://landinghub.visualstudio.com/visual-cpp-build-tools

    What should I need to do?
    Do I need to install Microsoft Visual C++ 14.0 or is there any other alternate?
    please help me in this

    ReplyDelete
  11. how to move from sqlite to postgres. there is no documentation of moving the database. although used command pip install airflow[postgres] . when airflow initdb command instantiated sqlite :(

    ReplyDelete
    Replies
    1. It's been a long time that i used airflow so not in a position to answer. My best bet would be to install postgres separately and check where you can change the db connection details.

      Delete
  12. I test out the HelloWord DAG, but has this error: How to overcome this StopIteration
    error? Thanks.
    (venv) [thiamhuat@ip-172-31-29-106 workspace]$ airflow test HelloWorld task_1 2018-09-12
    [2018-09-12 15:36:30,523] {__init__.py:57} INFO - Using executor SequentialExecutor
    [2018-09-12 15:36:30,822] {models.py:167} INFO - Filling up the DagBag from /home/thiamhuat/airflow/workspace/airflow_home/dags
    /home/thiamhuat/airflow/workspace/venv/lib/python3.6/site-packages/airflow/utils/helpers.py:406: DeprecationWarning: Importing BashOperator directly from has been deprecated. Please import from '.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
    DeprecationWarning)
    /home/thiamhuat/airflow/workspace/venv/lib/python3.6/site-packages/airflow/models.py:1140: DeprecationWarning: generator 'get_dep_statuses' raised StopIteration
    dep_context):
    /home/thiamhuat/airflow/workspace/venv/lib/python3.6/site-packages/airflow/ti_deps/deps/base_ti_dep.py:94: DeprecationWarning: generator '_get_dep_statuses' raised StopIteration
    for dep_status in self._get_dep_statuses(ti, session, dep_context):
    [2018-09-12 15:36:31,439] {models.py:1126} INFO - Dependencies all met for
    [2018-09-12 15:36:31,441] {models.py:1126} INFO - Dependencies all met for
    [2018-09-12 15:36:31,441] {models.py:1318} INFO -
    --------------------------------------------------------------------------------
    Starting attempt 1 of 2
    --------------------------------------------------------------------------------

    [2018-09-12 15:36:31,442] {models.py:1342} INFO - Executing on 2018-09-12 00:00:00
    [2018-09-12 15:36:31,455] {bash_operator.py:71} INFO - tmp dir root location:
    /tmp
    [2018-09-12 15:36:31,457] {bash_operator.py:80} INFO - Temporary script location :/tmp/airflowtmpk9d6hoxb//tmp/airflowtmpk9d6hoxb/task_1jj8vb348
    [2018-09-12 15:36:31,457] {bash_operator.py:81} INFO - Running command: echo "Hello World from Task 1"
    [2018-09-12 15:36:31,461] {bash_operator.py:90} INFO - Output:
    [2018-09-12 15:36:31,463] {bash_operator.py:94} INFO - Hello World from Task 1
    [2018-09-12 15:36:31,463] {bash_operator.py:97} INFO - Command exited with return code 0

    ReplyDelete