Airflow is a platform to programmatically author, schedule and monitor workflows.

We use airflow to author workflows as directed acyclic graphs (DAGs) of tasks. The airflow scheduler executes your tasks on an array of workers while following the specified dependencies, we have heard about airflow but we never knew how to get it working for the talend jobs and it will be very easy as we will have the UI for scheduling and monitoring the working flow of the talend jobs.

Lets check our To Do to achieve the goal

  1. Launching the instance of the Ec2 : We will be launching the ubuntu server for the installation of the airflow and also for copying the talend jobs in the server
  2. Installing Airflow in Ec2 instance : We will follow the steps for the installation of the airflow and get the webserver of the airflow working
  3. Adding of the talend job and creating DAGs file

Launching an ec2 instance in aws.

  • We will launch ubuntu 16.04 instance for airflow

Adding of Airflow in Ubuntu Server

Lets start by updating the version by command as

sudo apt-get update

We will use the root user before installing the airflow webserver

sudo -i

For installing the airflow we will need to install the python and pip

sudo apt-get install python-setuptools -y
sudo apt-get install python-pip -y
sudo pip install –upgrade pip

We will copy the pip installation directory from the user local bin to bin

sudo cp -v /usr/local/bin/pip /usr/bin/pip

Now we will start with the installation before installation we have to set the environment variable SLUGIFY_USES_TEXT_UNIDECODE=yes

pip install apache-airflow  [If it continuous giving error then type the command SLUGIFY_USES_TEXT_UNIDECODE=yes pip install apache-airflow] and apache airflow will be installed

As talend jobs need java to run so we will now install the java by the following command

sudo add-apt-repository ppa:openjdk-r/ppa
sudo apt-get update
sudo apt-get install openjdk-7-jdk

Now we will start with the initialisation of the airflow for the configuration as before

sudo airflow initdb

Change in the configuration by going to the file /root/airflow/airflow.cfg and also create the directories manually in the user folder

  • airflow_home = /home/ubuntu/airflow
  • dags_folder = /home/ubuntu/airflow/dags
  • base_log_folder = /home/ubuntu/airflow/logs

Again we will  initialise of the airflow for the configuration as before so as we can reflect the changes done by us in the configuration file

sudo airflow initdb

To start with the airflow webserver we should.Airflow webserver is used to start the ui of the airflow

sudo airflow webserver

As the airflow webserver will be started the link will be the publicip address of the ubuntu server : 8080 which is the default port which is used by the airflow configuration

And to start the scheduler as airflow scheduler when we start it works with the scheduling of the jobs and make the jobs run as per the scheduled given

sudo airflow scheduler

Adding of the job and creation of dags file

  • We will add the talend job in the home/user folder in the ubuntu server
  • We will change the permissions of the bat file which we want to run
    • chmod 777 /home/ubuntu/sample_0.1/sample/sample_run.sh
  • We will create the py file which we will add in the dags folder in the path /home/ubuntu/airflow/dags.The airflow runs the dags folder files as per the scheduled_interval given over there.
    • Following is the dags file created :

      # this import is used to instantiate dag
      from airflow import DAG
      # this import is used to run the tasks
      from airflow.operators.bash_operator import BashOperator
      from datetime import datetime, timedelta
      #Passing the sh file
      samplejob_command = “/home/ubuntu/sample_0.1/sample/sample_run.sh “
      # this is the arguments used by the bash operator
      default_args = {
          ‘owner’: ‘airflow’,
          ‘depends_on_past’: False,
          ‘start_date’: datetime(2019, 1, 6),
          ’email’: [‘pooja.kataria@complereinfosystem.com’],
          ’email_on_failure’: False,
          ’email_on_retry’: False,
          ‘retries’: 1,
          ‘retry_delay’: timedelta(minutes=5),
      }
      # instantiate a DAG
      dag = DAG(
          ‘samplejob_5’, default_args=default_args,,schedule_interval=”*/5 * * * *”)
      #task are given some specific arguments we can also override the default_args here.
      t1 = BashOperator(
          task_id=’samplejob_run5′,
          bash_command=samplejob_command,
          dag=dag)

       

As the DAG file is created and added in the DAG folder we should again start the airflow webserver and airflow scheduler so we can see the talend job DAG in the UI of the airflow, Be default all the DAGs are disabled so we can enable them from the UI and it will start running from the start_date which is mentioned in the py script by backfilling all the runs which are missed.

Now we can enjoy by running the jobs in the airflow webserver !!!

Following are the tips and tricks which are helpful for us while doing the above course of work.

Tips and Tricks:

  • Whenever we get the Session expired error we have to do airflow initdb and the airflow webserver will run again successfully
  • By default all the dags are disabled so we enable them from the ui
  • We have to run the webserver to get the ui and airflow scheduler to trigger the jobs which are on scheduled.
  • We should have both the webserver and scheduler running.
  • We can even run the webserver in the background by passing the command as sudo airflow webserver -D

Leave a Reply

Your email address will not be published. Required fields are marked *