Branching DAG

When designing data pipelines, there may be use cases that require more complex task flows than Task A > Task B > Task C. For example, let's say that there is a use case where different tasks need to be chosen to execute based on the results of an upstream task. We call this branching in Airflow, and it uses a particular Operator BranchPythonOperator to handle this use case.

The BranchPythonOperator takes a Python function as an input. The function must return a list of task IDs for the DAG to process using the function.

To give you an example of branching, let's create a DAG that uses Python random.choice function to decide which type of no-op transform it will execute.

Create a file named 5_branching_dag.py that contains the following code:

from datetime import datetime
import time

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator

with DAG(
    dag_id="5_braching_dag",
    start_date=datetime(2021, 12, 1),
    catchup=False,
    schedule_interval="@daily",
    tags=["custom"],
) as dag:
    get_source = DummyOperator(task_id="get_source")

    def branch_func():
        if int(time.time()) % 2 == 0:
            return "even_number_transform"
        else:
            return "odd_number_transform"

    branch_check = BranchPythonOperator(
        task_id="branch_check", python_callable=branch_func
    )
    even_number_transform = DummyOperator(task_id="even_number_transform")
    odd_number_transform = DummyOperator(task_id="odd_number_transform")

    get_source >> branch_check >> [even_number_transform, odd_number_transform]

Branching dag

From the historical runs, we can see that different transform tasks were run.

Branching dag runs