PythonOperator and TaskFlow Dag

Tasks that use PythonOperator execute Python callables/functions. You can pass parameters to the function via the op_kwargs parameter.

Airflow 2.0 adds a new style of authoring dags called the TaskFlow API which removes a lot of the boilerplate around creating PythonOperator, managing dependencies between task and accessing XCom values.

Let's create a DAG that uses both PythonOperator and TaskFlow API to show how to create tasks using Python functions.

In the below DAG, the first task uses PythonOperator to print the task context, including the parameter (my_keyword) that is passed in. The second task and third tasks are created using TaskFlow decorator. These tasks run Python functions without using PythonOperator.

Create a file named 7_python_operator_and_taskflow_dag.py that contains the following code:

import time
from datetime import datetime
from pprint import pprint

from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator


with DAG(
    dag_id="7_python_operator_and_taskflow_dag",
    schedule_interval=None,
    start_date=datetime(2021, 12, 1),
    catchup=False,
    tags=["custom"],
) as dag:
    # 1. print context using PythonOperator
    def print_context(ds, **kwargs):
        """Print the Airflow context and ds variable from the context."""
        pprint(kwargs)
        print(kwargs["my_keyword"])
        print(ds)
        return "Whatever you return gets printed in the logs"

    print_the_context = PythonOperator(
        task_id="print_the_context",
        python_callable=print_context,
        op_kwargs={"my_keyword": "Airflow"},
    )

    # 2. sleep task using TaskFlow decorator
    @task(task_id="sleep_for_5")
    def my_sleeping_function():
        """This is a function that will run within the DAG execution"""
        time.sleep(5)

    sleeping_task = my_sleeping_function()

    # 3. print context again using TaskFlow decorator
    @task(task_id="print_the_context_again")
    def print_context_again(ds=None, **kwargs):
        """Print the Airflow context and ds variable from the context."""
        pprint(kwargs)
        print(kwargs["my_keyword"])
        print(ds)
        return "Whatever you return gets printed in the logs"

    print_the_context_again = print_context_again(my_keyword="Airflow")

    print_the_context >> sleeping_task >> print_the_context_again