Introduction to Airflow
  • AI Chat
  • Code
  • Report
  • Beta
    Spinner

    Running a task in Airflow

    You've just started looking at using Airflow within your company and would like to try to run a task within the Airflow platform. You remember that you can use the airflow run command to execute a specific task within a workflow. Note that an error while using airflow run will return airflow.exceptions.AirflowException: on the last line of output.

    An Airflow DAG is set up for you with a dag_id of etl_pipeline. The task_id is download_file and the start_date is 2020-01-08. All other components needed are defined for you.

    Which command would you enter in the console to run the desired task? --> airflow run etl_pipeline download_file 2020-01-08

    Examining Airflow commands

    While researching how to use Airflow, you start to wonder about the airflow command in general. You realize that by simply running airflow you can get further information about various sub-commands that are available.

    • Running airflow by itself will list the sub-commands.
    • For further detail, you can use the airflow -h command.

    Defining a simple DAG

    You've spent some time reviewing the Airflow components and are interested in testing out your own workflows. To start you decide to define the default arguments and create a DAG object for your workflow.

    The DateTime object has been imported for you.

    Import the Airflow DAG object. Note that it is case-sensitive. --> from airflow.models import DAG

    1. Define the default_args dictionary with a key owner and a value of 'dsmith'.
    2. Add a start_date of January 14, 2020 to default_args using the value 1 for the month of January.
    3. Add a retries count of 2 to default_args.

    --> Define the default_args dictionary

    • default_args = {
    • 'owner': 'dsmith',
    • 'start_date': datetime(2020,1,14),
    • 'retries' : 2
    • }
    1. Instantiate the DAG object to a variable called etl_dag with a DAG named example_etl.
    2. Add the default_args dictionary to the appropriate argument.

    --> etl_dag = DAG('example_etl', default_args=default_args)

    Starting the Airflow webserver

    You've successfully created some DAGs within Airflow using the command-line tools, but notice that it can be a bit tricky to handle scheduling / troubleshooting / etc. After reading the documentation further, you realize that you'd like to access the Airflow web interface. For security reasons, you'd like to start the webserver on port 9090.

    Which airflow command would you use to start the webserver on port 9090? --> airflow webserver -p 9090

    Airflow is installed and accessible from the command line. Remember to use the airflow -h command if needed. airflow <subcommand> -h will provide further detail.

    Defining a BashOperator task

    The BashOperator allows you to specify any given Shell command or script and add it to an Airflow workflow. This can be a great start to implementing Airflow in your environment.

    As such, you've been running some scripts manually to clean data (using a script called cleanup.sh) prior to delivery to your colleagues in the Data Analytics group. As you get more of these tasks assigned, you've realized it's becoming difficult to keep up with running everything manually, much less dealing with errors or retries. You'd like to implement a simple script as an Airflow operator.

    The Airflow DAG analytics_dag is already defined for you and has the appropriate configurations in place.

    1. Import the BashOperator object.
    2. Define a BashOperator called cleanup with the task_id of cleanup_task.
    3. Use the command cleanup.sh.
    4. Add the operator to the DAG.
    # Import the BashOperator
    from airflow.operators.bash_operator import BashOperator
    
    # Define the BashOperator 
    
     cleanup = BashOperator(
         task_id= 'cleanup_task',
         # Define the bash_command
         
         bash_command='cleanup.sh',
         
         # Add the task to the dag
         
         dag=analytics_dag
     )

    Multiple BashOperators

    Airflow DAGs can contain many operators, each performing their defined tasks.

    You've successfully implemented one of your scripts as an Airflow task and have decided to continue migrating your individual scripts to a full Airflow DAG. You now want to add more components to the workflow. In addition to the cleanup.sh used in the previous exercise you have two more scripts, consolidate_data.sh and push_data.sh. These further process your data and copy to its final location.

    The DAG analytics_dag is available as before, and your cleanup task is still defined. The BashOperator is already imported.