Share this on:
What You'll Learn
The .expand function in Airflow allows dynamic task mapping, enabling tasks or task groups to process multiple inputs concurrently. When combined with task groups, .expand facilitates the creation of mapped instances of task groups, each operating on distinct inputs.
Below is an elaborative guide on using .expand within task groups and how to pass XCom values using ti.xcom_pull with map_indexes.
Overview of Task Groups
Task groups in Airflow are a tool for organizing tasks visually and logically within a DAG. They are particularly useful for:
- Structuring complex workflows into manageable units.
- Dynamically mapping over groups of tasks for parallel execution.
- Reusing modular patterns across DAGs.
Task groups can be defined using either:
- The @task_group decorator
- The TaskGroup class
For dynamic mapping, the @task_group decorator is required.
Using .expand with Task Groups
Defining a Task Group
A task group is defined as a container for multiple related tasks that execute sequentially or in parallel. Here’s an example:
from airflow.decorators import task, task_group
@task_group(group_id="process_table")
def process_table(table_config):
@task(task_id="generate_ddl_query")
def generate_ddl_query(config):
# Logic to generate DDL query
return {"ddl": "CREATE TABLE ...", "columns": ["col1", "col2"], "primary_keys": ["id"]}
@task(task_id="create_snowflake_table")
def create_snowflake_table(**context):
ti = context['ti']
ddl_result = ti.xcom_pull(task_ids="process_table.generate_ddl_query", map_indexes=ti.map_index)
snowflake_ddl = ddl_result["ddl"]
# Execute DDL in Snowflake
return snowflake_ddl
generate_ddl_query(table_config) >> create_snowflake_table()
Expanding the Task Group
The .expand() function dynamically maps over the inputs, creating multiple instances of the task group based on the provided list or iterable.
table_configs = [
{"sf_schema": "schema1", "sf_table": "table1"},
{"sf_schema": "schema2", "sf_table": "table2"}
]
process_table.expand(table_config=table_configs)
This creates two instances of the process_table task group, each processing one configuration from table_configs.
Passing XCom Values Using ti.xcom_pull and map_indexes
Purpose of map_indexes
When working with mapped tasks or task groups, XCom values are generated for each mapped instance. The map_indexes parameter ensures that only the XCom value corresponding to the current mapped instance is retrieved.
Example Usage
Within a mapped task group:
@task(task_id="create_snowflake_table")
def create_snowflake_table(**context):
ti = context['ti']
# Pull XCom data from the mapped instance using map_indexes
ddl_result = ti.xcom_pull(task_ids="process_table.generate_ddl_query", map_indexes=ti.map_index)
snowflake_ddl = ddl_result["ddl"]
# Execute Snowflake DDL query
return snowflake_ddl
@task(task_id="create_snowflake_table")
def create_snowflake_table(**context):
ti = context['ti']
# Pull XCom data from the mapped instance using map_indexes
ddl_result = ti.xcom_pull(task_ids="process_table.generate_ddl_query", map_indexes=ti.map_index)
snowflake_ddl = ddl_result["ddl"]
# Execute Snowflake DDL query
return snowflake_ddl
Key Parameters
- task_ids: Specifies the upstream task ID (e.g., "process_table.generate_ddl_query")
- map_indexes: Ensures retrieval of data specific to the current instance being executed
If map_indexes is omitted, all mapped instances’ XCom values are returned as an iterator.
Dynamic Mapping Workflow Example
Below is an example workflow that demonstrates dynamic mapping with task groups:
from airflow.decorators import dag, task_group, task
from pendulum import datetime
@dag(schedule_interval=None, start_date=datetime(2023, 1, 1), catchup=False)
def postgres_to_snowflake():
table_configs = [
{"sf_schema": "schema1", "sf_table": "table1", "pg_schema": "public", "pg_table": "table1"},
{"sf_schema": "schema2", "sf_table": "table2", "pg_schema": "public", "pg_table": "table2"}
]
@task_group(group_id="process_table")
def process_table(table_config):
@task(task_id="generate_ddl_query")
def generate_ddl_query(config):
# Generate DDL logic
return {"ddl": f"CREATE TABLE {config['sf_schema']}.{config['sf_table']}",
"columns": ["id", "name"],
"primary_keys": ["id"]}
@task(task_id="create_snowflake_table")
def create_snowflake_table(**context):
ti = context['ti']
ddl_result = ti.xcom_pull(task_ids="process_table.generate_ddl_query", map_indexes=ti.map_index)
snowflake_ddl = ddl_result["ddl"]
# Execute DDL logic here
return snowflake_ddl
generate_ddl_query(table_config) >> create_snowflake_table()
process_table.expand(table_config=table_configs)
dag_instance = postgres_to_snowflake()
Best Practices and Additional Notes for Task Group Expansion in Airflow
When working with task groups and dynamic mapping in Airflow, adhering to best practices ensures maintainability, scalability, and clarity of workflows. Additionally, understanding the nuances of task group expansion helps avoid common pitfalls. Below are some combined best practices and additional notes for using .expand effectively with task groups.
Best Practices
Avoid Excessive Nesting
Avoid deeply nested task groups within mapped ones, as it can lead to UI complexity and make debugging harder. Instead, flatten your workflow design where possible for better readability.
Use Descriptive IDs
Ensure that both tasks and task groups have unique and descriptive IDs. This avoids conflicts and makes it easier to identify tasks in the Airflow UI.
Leverage Tooltips and Labels
Use tooltips or labels on dependencies to provide additional context for your tasks or task groups. This enhances clarity when visualizing workflows in the Airflow UI.
Monitor Resource Usage
Optimize concurrency settings, resource allocation when expanding large datasets to prevent overloading your infrastructure. E.g., limit parallelism or use max_active_tasks to control resource usage.
Additional Notes
Dynamic Mapping Behavior
Task group expansion dynamically maps all tasks within the group against the same input parameters. This means that each instance of the task group will process one set of inputs from the provided iterable.
Using .partial() with .expand()
Use .partial() alongside .expand() to pass constant parameters that do not change across mapped instances. For example:
process_table.partial(constant_param="value").expand(table_config=table_configs)
Airflow UI Visibility
The Airflow UI provides detailed visibility into individual mapped instances within a task group. This feature is invaluable for debugging and monitoring purposes, as you can view the status of each mapped instance separately.
Also Read: ETL Data Architectures – Part 1
Author
Data Engineer