
Airflow is a platform to programmatically author, schedule and monitor workflows.
We use airflow to author workflows as directed acyclic graphs (DAGs) of tasks. The airflow scheduler executes your tasks on an array of workers while following the specified dependencies, we have heard about airflow but we never knew how to get it working for the talend jobs and it will be very easy as we will have the UI for scheduling and monitoring the working flow of the talend jobs.
Lets check our To Do to achieve the goal
- Launching the instance of the Ec2 : We will be launching the ubuntu server for the installation of the airflow and also for copying the talend jobs in the server
- Installing Airflow in Ec2 instance : We will follow the steps for the installation of the airflow and get the webserver of the airflow working
- Adding of the talend job and creating DAGs file
Launching an ec2 instance in aws.
- We will launch ubuntu 16.04 instance for airflow
Adding of Airflow in Ubuntu Server
Lets start by updating the version by command as
sudo apt-get update
|
We will use the root user before installing the airflow webserver
sudo -i
|
For installing the airflow we will need to install the python and pip
sudo apt-get install python-setuptools -y
sudo apt-get install python-pip -y
sudo pip install –upgrade pip
|
We will copy the pip installation directory from the user local bin to bin
sudo cp -v /usr/local/bin/pip /usr/bin/pip
|
Now we will start with the installation before installation we have to set the environment variable SLUGIFY_USES_TEXT_UNIDECODE=yes
pip install apache-airflow [If it continuous giving error then type the command SLUGIFY_USES_TEXT_UNIDECODE=yes pip install apache-airflow] and apache airflow will be installed
|
As talend jobs need java to run so we will now install the java by the following command
sudo add-apt-repository ppa:openjdk-r/ppa
sudo apt-get update
sudo apt-get install openjdk-7-jdk
|
Now we will start with the initialisation of the airflow for the configuration as before
sudo airflow initdb
|
Change in the configuration by going to the file /root/airflow/airflow.cfg and also create the directories manually in the user folder
- airflow_home = /home/ubuntu/airflow
- dags_folder = /home/ubuntu/airflow/dags
- base_log_folder = /home/ubuntu/airflow/logs
Again we will initialise of the airflow for the configuration as before so as we can reflect the changes done by us in the configuration file
sudo airflow initdb
|
To start with the airflow webserver we should.Airflow webserver is used to start the ui of the airflow
sudo airflow webserver
|
As the airflow webserver will be started the link will be the publicip address of the ubuntu server : 8080 which is the default port which is used by the airflow configuration
And to start the scheduler as airflow scheduler when we start it works with the scheduling of the jobs and make the jobs run as per the scheduled given
sudo airflow scheduler
|
Adding of the job and creation of dags file
- We will add the talend job in the home/user folder in the ubuntu server
- We will change the permissions of the bat file which we want to run
- chmod 777 /home/ubuntu/sample_0.1/sample/sample_run.sh
- We will create the py file which we will add in the dags folder in the path /home/ubuntu/airflow/dags.The airflow runs the dags folder files as per the scheduled_interval given over there.
-
Following is the dags file created :
# this import is used to instantiate dagfrom airflow import DAG# this import is used to run the tasksfrom airflow.operators.bash_operator import BashOperatorfrom datetime import datetime, timedelta#Passing the sh filesamplejob_command = “/home/ubuntu/sample_0.1/sample/sample_run.sh “# this is the arguments used by the bash operatordefault_args = {‘owner’: ‘airflow’,‘depends_on_past’: False,‘start_date’: datetime(2019, 1, 6),’email’: [‘pooja.kataria@complereinfosystem.com’],’email_on_failure’: False,’email_on_retry’: False,‘retries’: 1,‘retry_delay’: timedelta(minutes=5),}# instantiate a DAGdag = DAG(‘samplejob_5’, default_args=default_args,,schedule_interval=”*/5 * * * *”)#task are given some specific arguments we can also override the default_args here.t1 = BashOperator(task_id=’samplejob_run5′,bash_command=samplejob_command,dag=dag)
-
As the DAG file is created and added in the DAG folder we should again start the airflow webserver and airflow scheduler so we can see the talend job DAG in the UI of the airflow, Be default all the DAGs are disabled so we can enable them from the UI and it will start running from the start_date which is mentioned in the py script by backfilling all the runs which are missed.
Now we can enjoy by running the jobs in the airflow webserver !!!
Following are the tips and tricks which are helpful for us while doing the above course of work.
Tips and Tricks:
- Whenever we get the Session expired error we have to do airflow initdb and the airflow webserver will run again successfully
- By default all the dags are disabled so we enable them from the ui
- We have to run the webserver to get the ui and airflow scheduler to trigger the jobs which are on scheduled.
- We should have both the webserver and scheduler running.
- We can even run the webserver in the background by passing the command as sudo airflow webserver -D