Xcoms DAG
Sharing data between Tasks is a common use case in Airflow. For example, a Task calls an API to get the data filenames for today's data ingestion DAG. The following Tasks need to know these filenames to load the data.
XCom (short for cross-communication) is a native feature within Airflow. XComs allow tasks to exchange Task metadata or small amounts of data. XComs can be "pushed" (sent) or "pulled" (retrieved). When a task pushes an XCom, it makes it generally available to other tasks.
There are two ways to push a value to XCom.
- Use
xcom_pull
- Return the value in your function, and it will be pushed to Xcom automatically.
When a Task (An instance of an Operator) is running, it will get a copy of the Task Instance passed to it. When python_callable
is used inside a PythonOperator
Task, you can get the task instance object via ti = kwargs['ti']
. After that, we can call the xcom_pull
function to retrieve the Xcom value.
Let's create a DAG to exchange value between tasks.
Create a file named 6_xcoms_dag.py
that contains the following code:
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
DAG = DAG(
dag_id="6_xcoms_dag",
start_date=datetime.now(),
catchup=False,
schedule_interval="@once",
tags=["custom"],
)
def push_function(**kwargs):
ls = ["a", "b", "c"]
return ls
push_task = PythonOperator(
task_id="push_task", python_callable=push_function, provide_context=True, dag=DAG
)
def pull_function(**kwargs):
ti = kwargs["ti"]
ls = ti.xcom_pull(task_ids="push_task")
print(ls)
pull_task = PythonOperator(
task_id="pull_task", python_callable=pull_function, provide_context=True, dag=DAG
)
push_task >> pull_task
From Airflow UI, there is a tab next to Log
called XCom that shows XCom values.
Let's check the pull_task
. Yes, the value was retrieved.
XCom values are stored in Airflow database and are shown on UI or logs. It is important not to store sensitive information and large data in them.