Detailed Documentation for Using .expand with Task Groups in Airflow

Learn how to use .expand with task groups in Airflow for dynamic task mapping, XCom handling, and best practices to optimize workflows.

Share this on:

LinkedIn
X

What You'll Learn

The .expand function in Airflow allows dynamic task mapping, enabling tasks or task groups to process multiple inputs concurrently. When combined with task groups, .expand facilitates the creation of mapped instances of task groups, each operating on distinct inputs.

Below is an elaborative guide on using .expand within task groups and how to pass XCom values using ti.xcom_pull with map_indexes.

Overview of Task Groups

Task groups in Airflow are a tool for organizing tasks visually and logically within a DAG. They are particularly useful for:

Task groups can be defined using either:

For dynamic mapping, the @task_group decorator is required.

Using .expand with Task Groups

Defining a Task Group

A task group is defined as a container for multiple related tasks that execute sequentially or in parallel. Here’s an example:

    
     from airflow.decorators import task, task_group 
 
@task_group(group_id="process_table") 
def process_table(table_config): 
    @task(task_id="generate_ddl_query") 
    def generate_ddl_query(config): 
        # Logic to generate DDL query 
        return {"ddl": "CREATE TABLE ...", "columns": ["col1", "col2"], "primary_keys": ["id"]} 
 
    @task(task_id="create_snowflake_table") 
    def create_snowflake_table(**context): 
        ti = context['ti'] 
        ddl_result = ti.xcom_pull(task_ids="process_table.generate_ddl_query", map_indexes=ti.map_index) 
        snowflake_ddl = ddl_result["ddl"] 
        # Execute DDL in Snowflake 
        return snowflake_ddl 
 
    generate_ddl_query(table_config) >> create_snowflake_table() 
    
   

Expanding the Task Group

The .expand() function dynamically maps over the inputs, creating multiple instances of the task group based on the provided list or iterable.

    
     table_configs = [ 
    {"sf_schema": "schema1", "sf_table": "table1"}, 
    {"sf_schema": "schema2", "sf_table": "table2"} 
] 
 
process_table.expand(table_config=table_configs) 
    
   

This creates two instances of the process_table task group, each processing one configuration from table_configs.

Passing XCom Values Using ti.xcom_pull and map_indexes

Purpose of map_indexes

When working with mapped tasks or task groups, XCom values are generated for each mapped instance. The map_indexes parameter ensures that only the XCom value corresponding to the current mapped instance is retrieved.

Example Usage

Within a mapped task group:

    
     @task(task_id="create_snowflake_table") 
def create_snowflake_table(**context): 
    ti = context['ti'] 
    # Pull XCom data from the mapped instance using map_indexes 
    ddl_result = ti.xcom_pull(task_ids="process_table.generate_ddl_query", map_indexes=ti.map_index) 
    snowflake_ddl = ddl_result["ddl"] 
    # Execute Snowflake DDL query 
    return snowflake_ddl 
@task(task_id="create_snowflake_table") 
def create_snowflake_table(**context): 
    ti = context['ti'] 
    # Pull XCom data from the mapped instance using map_indexes 
    ddl_result = ti.xcom_pull(task_ids="process_table.generate_ddl_query", map_indexes=ti.map_index) 
    snowflake_ddl = ddl_result["ddl"] 
    # Execute Snowflake DDL query 
    return snowflake_ddl 
    
   

Key Parameters

If map_indexes is omitted, all mapped instances’ XCom values are returned as an iterator.

Dynamic Mapping Workflow Example

Below is an example workflow that demonstrates dynamic mapping with task groups:

    
     from airflow.decorators import dag, task_group, task 
from pendulum import datetime 
 
@dag(schedule_interval=None, start_date=datetime(2023, 1, 1), catchup=False) 
def postgres_to_snowflake(): 
    table_configs = [ 
        {"sf_schema": "schema1", "sf_table": "table1", "pg_schema": "public", "pg_table": "table1"}, 
        {"sf_schema": "schema2", "sf_table": "table2", "pg_schema": "public", "pg_table": "table2"} 
    ] 
 
    @task_group(group_id="process_table") 
    def process_table(table_config): 
        @task(task_id="generate_ddl_query") 
        def generate_ddl_query(config): 
            # Generate DDL logic 
            return {"ddl": f"CREATE TABLE {config['sf_schema']}.{config['sf_table']}",  
                    "columns": ["id", "name"],  
                    "primary_keys": ["id"]} 
 
        @task(task_id="create_snowflake_table") 
        def create_snowflake_table(**context): 
            ti = context['ti'] 
            ddl_result = ti.xcom_pull(task_ids="process_table.generate_ddl_query", map_indexes=ti.map_index) 
            snowflake_ddl = ddl_result["ddl"] 
            # Execute DDL logic here 
            return snowflake_ddl 
 
        generate_ddl_query(table_config) >> create_snowflake_table() 
 
    process_table.expand(table_config=table_configs) 
 
dag_instance = postgres_to_snowflake() 
    
   

Best Practices and Additional Notes for Task Group Expansion in Airflow

When working with task groups and dynamic mapping in Airflow, adhering to best practices ensures maintainability, scalability, and clarity of workflows. Additionally, understanding the nuances of task group expansion helps avoid common pitfalls. Below are some combined best practices and additional notes for using .expand effectively with task groups.

Best Practices

Avoid Excessive Nesting

Avoid deeply nested task groups within mapped ones, as it can lead to UI complexity and make debugging harder. Instead, flatten your workflow design where possible for better readability.

Use Descriptive IDs

Ensure that both tasks and task groups have unique and descriptive IDs. This avoids conflicts and makes it easier to identify tasks in the Airflow UI.

Leverage Tooltips and Labels

Use tooltips or labels on dependencies to provide additional context for your tasks or task groups. This enhances clarity when visualizing workflows in the Airflow UI.

Monitor Resource Usage

Optimize concurrency settings, resource allocation when expanding large datasets to prevent overloading your infrastructure. E.g., limit parallelism or use max_active_tasks to control resource usage.

Additional Notes

Dynamic Mapping Behavior

Task group expansion dynamically maps all tasks within the group against the same input parameters. This means that each instance of the task group will process one set of inputs from the provided iterable.

Using .partial() with .expand()

Use .partial() alongside .expand() to pass constant parameters that do not change across mapped instances. For example:

    
     process_table.partial(constant_param="value").expand(table_config=table_configs) 
    
   

Airflow UI Visibility

The Airflow UI provides detailed visibility into individual mapped instances within a task group. This feature is invaluable for debugging and monitoring purposes, as you can view the status of each mapped instance separately.

Also Read: ETL Data Architectures – Part 1 

Author

Picture of Aditya Singhal
Aditya Singhal

Data Engineer

resources

Read our Case Studies