Airflow is a workflow engine from Airbnb. Airbnb developed it for its internal use and had recently open sourced it. In Airflow, the workflow is defined programmatically. Airflow document says that it's more maintainable to build workflows in this way, however I would leave it to the judgement of everyone. A developer, anyway would love anything programmatic. :) Airflow comes with a UI also and I can say that the UI is very clean and impressive.
The main concept of airflow is a DAG (Directed Acyclic Graph). Yes, it's the same graph that you have seen in Maths, if you have seen it. A DAG contains vertices and directed edges. In a DAG, you can never reach to the same vertex, at which you have started, following the directed edges. Otherwise your workflow can get into an infinite loop. In workflow context, tasks can be defined as vertex and the sequence is represented with the directed edge. The sequence decides the order in which the tasks will be performed.
Make no mistake about the fact that airflow is just a workflow engine. It is only responsible for defining tasks and sequences. The details of task has to be handled by each task on its own. Airflow provides hooks for initiating tasks and has integration points to other systems. But at the end of the day, it's a workflow management system and no more than that.
Oh!, one thing need to mention before we move ahead. Airflow is in Python and the workflows are also defined using Python.
Installation
Airflow needs a home and we can give the home to any place. For this let's say we give it at home
export AIRFLOW_HOME=~/airflow
Install airflow
pip install airflow
Initialize database
airflow initdb
Start the webserver
airflow webserver -p 8080
Writing a DAG
Now let's write aworkflow in the form of a DAG. We will have four task t1, t2, t3 and t4. t4 will depend on t2 and t3. t2 and t3, in turn will depend on t1. Let's name the script helloWorld.py and put it in dags folder of airflow home. Create dags folder if it's not there.
Writing a DAG
Now let's write aworkflow in the form of a DAG. We will have four task t1, t2, t3 and t4. t4 will depend on t2 and t3. t2 and t3, in turn will depend on t1. Let's name the script helloWorld.py and put it in dags folder of airflow home. Create dags folder if it's not there.
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
# Following are defaults which can be overridden later on
default_args = {
'owner': 'lalit.bhatt',
'depends_on_past': False,
'start_date': datetime(2016, 4, 15),
'email': ['lalit.bhatt@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
dag = DAG('HelloWorld', default_args=default_args)
# t1, t2, t3 and t4 are examples of tasks created using operators
t1 = BashOperator(
task_id='task_1',
bash_command='echo "Hello World from Task 1"',
dag=dag)
t2 = BashOperator(
task_id='task_2',
bash_command='echo "Hello World from Task 2"',
dag=dag)
t3 = BashOperator(
task_id='task_3',
bash_command='echo "Hello World from Task 3"',
dag=dag)
t4 = BashOperator(
task_id='task_4',
bash_command='echo "Hello World from Task 4"',
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
t4.set_upstream(t2)
t4.set_upstream(t3)
Testing and Installing DAG
Make sure script compiles. Go to dags folder and run
python helloWorld.py
Some useful command lines
List all DAGs'
airflow list_dags
List Tasks for the given DAG
airflow list_tasks HelloWorld
Backfilling
airflow backfill HelloWorld -s 2015-04-12 -e 2015-04-15
Individual tasks can be tested also
airflow test HelloWorld task_1 2016-04-15
For details see the following video
Individual tasks can be tested also
airflow test HelloWorld task_1 2016-04-15
For details see the following video
Reference: https://github.com/airbnb/airflow .
Also see the discussion at https://github.com/airbnb/airflow/issues/289 . This gives an idea about the possible future of airflow
Also see the discussion at https://github.com/airbnb/airflow/issues/289 . This gives an idea about the possible future of airflow
Afroz - Looking at the text that you have copied my best guess is version incompatibilities. Are you getting some stack trace which can help in understanding issue better.
ReplyDeleteHi Lalit,
ReplyDeleteThankyou for a very clear outlined post on Airflow. I have a question. I've installed airflow onto my server as per your instrustructions. The installation looks to contain both the master (server) and slave (client). How do you install the slave/client component of Airflow on another computer? Or does each installation need to contain both the server and client?
Thanks
Zenro
This comment has been removed by a blog administrator.
ReplyDeleteHi Lalit,
ReplyDeleteI followed you video but the created DAG is listed for `airflow list_dags` but not in the UI.
Tried browser refresh still the same.
Its getting listed in UI only when i stop and start the webserver(airflow webserver -p 8080).
Please help me to figure this out.
_Vinish
Hi Lalit,
ReplyDeleteI followed you video to create DAG but the created dag is not getting listed in the UI.
Tried browser refresh still the same.
Its getting listed for `airflow list_dags` but not in the UI unless the webserver(airflow webserver -p 8080) is stopped and started again.
_Vinish
hi,
ReplyDeleteThank you for this example. It helped clear out the basics. Can you explain how to run python functions as tasks in airflow?
Thanks
Hi,
ReplyDeleteThank you for this example. It helped clear out the basics. I would like to know, how to run a python function as a task.
Thanks
Hi,
ReplyDeleteHow can i make oracle as backend for airflow. I tried creating a connection and i think it created sucessfully. When i am trying to perform an adhoc query under data profiling i dont see my connection. ANy help Thanks!
It's been a while I have not used airflow so would be difficult for me to help out.
DeleteWhile installing airflow it shows following error
ReplyDeleteerror: Microsoft Visual C++ 14.0 is required. Get it with "Microsoft Visual C++
Build Tools": http://landinghub.visualstudio.com/visual-cpp-build-tools
What should I need to do?
Do I need to install Microsoft Visual C++ 14.0 or is there any other alternate?
please help me in this
While installing airflow it shows following error
ReplyDeleteerror: Microsoft Visual C++ 14.0 is required. Get it with "Microsoft Visual C++
Build Tools": http://landinghub.visualstudio.com/visual-cpp-build-tools
What should I need to do?
Do I need to install Microsoft Visual C++ 14.0 or is there any other alternate?
please help me in this
how to move from sqlite to postgres. there is no documentation of moving the database. although used command pip install airflow[postgres] . when airflow initdb command instantiated sqlite :(
ReplyDeleteIt's been a long time that i used airflow so not in a position to answer. My best bet would be to install postgres separately and check where you can change the db connection details.
DeleteI test out the HelloWord DAG, but has this error: How to overcome this StopIteration
ReplyDeleteerror? Thanks.
(venv) [thiamhuat@ip-172-31-29-106 workspace]$ airflow test HelloWorld task_1 2018-09-12
[2018-09-12 15:36:30,523] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-09-12 15:36:30,822] {models.py:167} INFO - Filling up the DagBag from /home/thiamhuat/airflow/workspace/airflow_home/dags
/home/thiamhuat/airflow/workspace/venv/lib/python3.6/site-packages/airflow/utils/helpers.py:406: DeprecationWarning: Importing BashOperator directly from has been deprecated. Please import from '.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
DeprecationWarning)
/home/thiamhuat/airflow/workspace/venv/lib/python3.6/site-packages/airflow/models.py:1140: DeprecationWarning: generator 'get_dep_statuses' raised StopIteration
dep_context):
/home/thiamhuat/airflow/workspace/venv/lib/python3.6/site-packages/airflow/ti_deps/deps/base_ti_dep.py:94: DeprecationWarning: generator '_get_dep_statuses' raised StopIteration
for dep_status in self._get_dep_statuses(ti, session, dep_context):
[2018-09-12 15:36:31,439] {models.py:1126} INFO - Dependencies all met for
[2018-09-12 15:36:31,441] {models.py:1126} INFO - Dependencies all met for
[2018-09-12 15:36:31,441] {models.py:1318} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 2
--------------------------------------------------------------------------------
[2018-09-12 15:36:31,442] {models.py:1342} INFO - Executing on 2018-09-12 00:00:00
[2018-09-12 15:36:31,455] {bash_operator.py:71} INFO - tmp dir root location:
/tmp
[2018-09-12 15:36:31,457] {bash_operator.py:80} INFO - Temporary script location :/tmp/airflowtmpk9d6hoxb//tmp/airflowtmpk9d6hoxb/task_1jj8vb348
[2018-09-12 15:36:31,457] {bash_operator.py:81} INFO - Running command: echo "Hello World from Task 1"
[2018-09-12 15:36:31,461] {bash_operator.py:90} INFO - Output:
[2018-09-12 15:36:31,463] {bash_operator.py:94} INFO - Hello World from Task 1
[2018-09-12 15:36:31,463] {bash_operator.py:97} INFO - Command exited with return code 0