Testing

Like any software, Airflow pipelines need to be tested. In Airflow, we usually do unit tests and end-to-end tests to ensure Airflow pipelines work well before deployments.

In our project, Pytest is used as the test runner.

Unit tests

When testing Airflow, the first thing we'd like to do is to use the below test to make sure all the DAGs to be deployed don't have errors themselves. These include syntax errors, library import errors, etc.

from airflow.models import DagBag


def test_dag_loaded():
    dag_bag = DagBag(include_examples=False)
    assert dag_bag.import_errors == {}

If there are custom Operators, Hooks, and Sensors, they also need to be unit tested.

For example, with the BigQueryHook being mocked, the CheckBigQueryDatasetOperator can be tested like as the below:

import unittest
from unittest import mock
import pytest

from operators.check_bigquery_dataset_operator import CheckBigQueryDatasetOperator
from google.cloud.bigquery.dataset import DatasetListItem


DUMMY_DATASET_ID = "dummy_dataset_id"
DUMMY_NON_EXISTED_DATASET_ID = "dummy_non_existed_dataset_id"
DUMMY_DATASETS = [
    {
        "kind": "bigquery#dataset",
        "location": "US",
        "id": "your-project:dummy_dataset_id",
        "datasetReference": {
            "projectId": "your-project",
            "datasetId": "dummy_dataset_id",
        },
    },
    {
        "kind": "bigquery#dataset",
        "location": "US",
        "id": "your-project:another_dummy_dataset_id",
        "datasetReference": {
            "projectId": "your-project",
            "datasetId": "another_dummy_dataset_id",
        },
    },
]


class TestCheckBigQueryDatasetOperator(unittest.TestCase):
    @mock.patch("operators.check_bigquery_dataset_operator.BigQueryHook")
    def test_existed_dataset(self, mock_hook):
        operator = CheckBigQueryDatasetOperator(
            task_id="dataset_exists_task",
            dataset_id=DUMMY_DATASET_ID,
        )

        mock_hook.return_value.get_datasets_list.return_value = [
            DatasetListItem(d) for d in DUMMY_DATASETS
        ]
        assert operator.execute(None) == True

    @mock.patch("operators.check_bigquery_dataset_operator.BigQueryHook")
    def test_non_existed_dataset(self, mock_hook):
        operator = CheckBigQueryDatasetOperator(
            task_id="dataset_exists_task",
            dataset_id=DUMMY_NON_EXISTED_DATASET_ID,
        )

        mock_hook.return_value.get_datasets_list.return_value = [
            DatasetListItem(d) for d in DUMMY_DATASETS
        ]

        with pytest.raises(
            Exception, match=f"Dataset id {DUMMY_NON_EXISTED_DATASET_ID} not found"
        ):
            operator.execute(None)

End-to-end tests

With the unit tests ensuring DAGs and custom plugins are reasonable, we also need some end-to-end tests to ensure the pipeline runs well.

Like any other end-to-end tests, we generate some input, run the program, and check the output. To test our DAG, the test steps look like the below:

  1. Create and upload three CSV files to Google Cloud Storage (GCS)
  2. Trigger Airflow DAG
  3. Wait for 30 seconds for the DAG to finish
  4. Check the nudge table is refreshed

Ideally, this test should be running in a test environment with a test Composer environment. Then we can trigger the DAG using the Airflow REST API by following the Google document here.

I understand that there may be expensive to host a test Composer because it needs to run 24/7. To save the cost, you can run Airflow locally (I will cover more details of this approach in the another chapter) with docker-compose and using the below code to trigger the DAG:

    def trigger_dag(self):
        subprocess.run(
            "docker-compose run airflow-scheduler airflow dags trigger 9_generate_nudges_dag",
            shell=True,
        )

Now we have the tests ready, let's move on to the next topic - CI and CD.