Create nudges DAG

When designing a DAG, we often start with critical tasks such as loading the data, transformation, exporting the data, etc. Then iteratively, we add other tasks such as checking if the BigQuery Dataset exists or not, if today's load has happened, etc.

Key tasks

To generate the nudges for the customers, we can load the daily exported CSV files to three tables in BigQuery. After that, we can run a SQL query that joins the three tables, create the nudge information, and store the results in another table. The DAG looks like this:

A diagram of the case study DAG

As the CSV files are on Google Cloud Storage (GCS) and we need to load them to BigQuery, we need an Operator that can do GCS to BigQuery. GCS to BigQuery is a pretty generic job, let's search in Astronomer Registry to see if Airflow has it in the built-in libraries:

Screenshot showing the search UI

Yes, we found the GCSToBigQueryOperator. Following it's documentation, let's create our three data load tasks:

Create a file named 9_generate_nudges_dag.py that contains the following code:

    load_accounts_csv = GCSToBigQueryOperator(
        task_id="load_accounts_csv",
        bucket=STORE_RAW_DATA_BUCKET,
        source_objects=[f"accounts_{CURRENT_DATE}.csv"],
        destination_project_dataset_table=f"{DATASET_ID}.accounts",
        schema_fields=[
            {"name": "account_id", "type": "INTEGER", "mode": "NULLABLE"},
            {"name": "account_name", "type": "STRING", "mode": "NULLABLE"},
            {"name": "email", "type": "STRING", "mode": "NULLABLE"},
        ],
        write_disposition="WRITE_TRUNCATE",
    )

    load_activities_csv = GCSToBigQueryOperator(
        task_id="load_activities_csv",
        bucket=STORE_RAW_DATA_BUCKET,
        source_objects=[f"activities_{CURRENT_DATE}.csv"],
        destination_project_dataset_table=f"{DATASET_ID}.activities",
        schema_fields=[
            {"name": "account_id", "type": "INTEGER", "mode": "NULLABLE"},
            {"name": "item_id", "type": "INTEGER", "mode": "NULLABLE"},
            {"name": "visit_time", "type": "TIMESTAMP", "mode": "NULLABLE"},
        ],
        write_disposition="WRITE_TRUNCATE",
    )

    load_items_csv = GCSToBigQueryOperator(
        task_id="load_items_csv",
        bucket=STORE_RAW_DATA_BUCKET,
        source_objects=[f"items_{CURRENT_DATE}.csv"],
        destination_project_dataset_table=f"{DATASET_ID}.items",
        schema_fields=[
            {"name": "item_id", "type": "INTEGER", "mode": "NULLABLE"},
            {"name": "item_name", "type": "STRING", "mode": "NULLABLE"},
            {"name": "price", "type": "FLOAT", "mode": "NULLABLE"},
        ],
        write_disposition="WRITE_TRUNCATE",
    )

    generate_nudges = GenerateNudgesOperator(
        task_id="generate_nudges",
        nudges_query=NUDGES_QUERY,
        destination_dataset_table="analytics.nudges",
    )

The generate_nudges task runs a BigQuery query and saves the results in a BigQuery table. It may look like an ordinary job. But unfortunately, there isn't an existing Operator that does the job. We need to create a custom Operator. In this custom Operator, we can use the built-in BigQueryHook to interact with BigQuery. We can find its source code on GitHub.

Let's call the custom Operator GenerateNudgesOperator

Create a file named generate_nudges_operator.py that contains the following code:

from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.utils.decorators import apply_defaults


class GenerateNudgesOperator(BaseOperator):
    @apply_defaults
    def __init__(
        self,
        *,
        nudges_query: str,
        destination_dataset_table: str,
        gcp_conn_id: str = "google_cloud_default",
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)

        self.gcp_conn_id = gcp_conn_id
        self.nudges_query = nudges_query
        self.destination_dataset_table = destination_dataset_table

    def execute(self, context):
        self.log.info(
            f"Generating nudges with query {self.nudges_query} and write into {self.destination_dataset_table}..."
        )

        hook = BigQueryHook(gcp_conn_id=self.gcp_conn_id, use_legacy_sql=False)

        hook.run_query(
            sql=self.nudges_query,
            destination_dataset_table=self.destination_dataset_table,
            write_disposition="WRITE_TRUNCATE",
        )

And the task looks like this:

    generate_nudges = GenerateNudgesOperator(
        task_id="generate_nudges",
        nudges_query=NUDGES_QUERY,
        destination_dataset_table="analytics.nudges",
    )

Add other tasks

Check BigQuery Dataset

As a safety measure, we should add a task to check if the BigQuery Dataset exists before loading the CSV files.

After checking the Airflow built-in Operators, there isn't a built-in one. Let's create another custom Operator called CheckBigQueryDatasetOperator. Because it needs to access BigQuery, we can use the BigQueryHook again.

Create a file named check_bigquery_dataset_operator.py that contains the following code:

from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.utils.decorators import apply_defaults


class CheckBigQueryDatasetOperator(BaseOperator):
    @apply_defaults
    def __init__(
        self,
        *,
        dataset_id,
        gcp_conn_id="google_cloud_default",
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)

        self.gcp_conn_id = gcp_conn_id
        self.dataset_id = dataset_id

    def execute(self, context):
        self.log.info(f"Checking dataset {self.dataset_id}...")

        hook = BigQueryHook(
            gcp_conn_id=self.gcp_conn_id,
        )

        # Check dataset exists
        datasets_list = hook.get_datasets_list()
        self.log.info(f"datasets_list: {datasets_list}")
        is_dataset_existed = False
        if datasets_list:
            datasets_id_list = [dataset.dataset_id for dataset in datasets_list]
            is_dataset_existed = self.dataset_id in datasets_id_list

        if is_dataset_existed:
            return True
        else:
            raise Exception(f"Dataset id {self.dataset_id} not found")

And the task looks like this:

    check_bigquery_dataset = CheckBigQueryDatasetOperator(
        task_id="check_bigquery_dataset",
        dataset_id=DATASET_ID,
    )

Now the DAG looks like this:

A diagram of the changed DAG

Avoid duplicated runs

It is always a good idea to check if today's run finishes. There are multiple ways to do that. If there are numerous data pipelines, you should build an API to record their runs. When a pipeline is kicked off, it checks if the run for that day has been finished by calling the API. If yes, this particular DAG run should not continue with the following tasks.

In our case study, as we only have one data pipeline, we can assume that:

  • The email system sends the emails out every day
  • The data pipeline uploads a file to GCS (using the current date as the filename)

With this assumption, a few tasks can be added to finalize the DAG.

A diagram of the final DAG

Let's dig into the new tasks.

Get latest run date

As mentioned above, we can check if a file named as current date exists in the GCS bucket. To do this, we can use the GCSListObjectsOperator from Airflow built-in libraries. The task looks like this:

    get_latest_run_date = GCSListObjectsOperator(
        task_id="get_latest_run_date",
        bucket=NUDGES_HISTORY_BUCKET,
        prefix=CURRENT_DATE,
    )

From the code on Github, it returns the URI of the file on GCS or empty array(if the file does not exist). In Airflow Operator, any value that is returned by execute function is stored in xcom.

Now we have a value in xcom, let's move on to the next task.

Check run date

In this task, we can use BranchPythonOperator to decide if this particular run should continue. The task looks like this:

    check_run_date = BranchPythonOperator(
        task_id="check_run_date", python_callable=branch_func
    )

And the branch_func:

def branch_func(ti):
    # ti means Task Instance here. It is used to retrieve Xcom value
    xcom_return_value = ti.xcom_pull(task_ids="get_latest_run_date", key="return_value")
    log.info(f"Xcom_return_value: {xcom_return_value}")
    if xcom_return_value:
        log.info("Today's run already exists, finishing the dag...")
        return "finish_run"
    else:
        log.info("Today's run does not exist, kicking off the run...")

From here, Airflow will decide if the DAG run should continue loading data or finish. To make the pipeline more user-friendly, we can use two DummyOperator to represent kick_off_run and finish_run tasks.

    kick_off_run = DummyOperator(task_id="kick_off_run")
    finish_run = DummyOperator(task_id="finish_run")

Summary

In this chapter, we've walked through designing and creating a DAG to load data from three CSV files and generate nudges in BigQuery. If you'd like to run it, you can run it yourself, be sure to replace the project ID and buckets in the DAG file.

PROJECT_ID = "derrick-sandbox"
NUDGES_HISTORY_BUCKET = "nudges_history"
STORE_RAW_DATA_BUCKET = "store_raw_data"

Or you can wait until the next chapter, in which I will cover the testing strategy, including a quick end-to-end test that can generate test files and trigger the DAG.