Branching in Airflow workflows
With the growth of complexity of our Airflow DAGs, our workflows started to have multiple branches. Simple cases might be implemented with custom checks, more complex ones require utilizing the Airflow API. The latter has some not obvious behavior which I’m going to tell in this article and how to overcome them. I will use the following DAG as a sample for this discussion.
There are two ways of dealing with branching in Airflow DAGs:
ShortCircuitOperator. You may find articles about usage of them and after that their work seems quite logical. For example, the article below covers both.
Branching in Airflow | Apache Airflow Guides
A powerful tool in Airflow is branching via the BranchPythonOperator. The BranchPythonOperator is similar to the…
But when you start dealing with some more advanced workflows you may notice that they do not allow you to do what seems logical. For example, if you skip only one task, then you will find out that also were skipped tasks which were not supposed to be skipped. Those which are underlined with the blue line on the picture below should not be skipped when you skip the task
This “erroneous” situation happens when you use the operators mentioned above. If you try to use some internal methods, it won’t solve anything either. Neither #1 nor #2 from below would help.
- Task instance
Why is this happen?
Airflow uses trigger rules for tasks to determine how tasks should be executed.
"""Class with task's trigger rules."""
ALL_SUCCESS = 'all_success'
ALL_FAILED = 'all_failed'
ALL_DONE = 'all_done'
ONE_SUCCESS = 'one_success'
ONE_FAILED = 'one_failed'
NONE_FAILED = 'none_failed'
NONE_FAILED_OR_SKIPPED = 'none_failed_or_skipped'
NONE_SKIPPED = 'none_skipped'
DUMMY = 'dummy'
Trigger rules are an old trait of Airflow tasks, though I wasn’t able to find clear information in the documentation for the 2.x version. So below is the old documentation, but it’s still valid.
Concepts - Airflow Documentation
In Airflow, a - or a Directed Acyclic Graph - is a collection of all the tasks you want to run, organized in a way that…
Default value for the task is
all_success. It means that the task will run if all parents have succeeded. In our case this condition won’t be satisfied. This is why the task
t7 in my example is not running. And the rest are not executed because their parents haven’t succeeded.
Fix of the branching logic
The fix is pretty simple: operators of the DAG with branches should set the trigger rule to
one_success. Tasks with this rule will be executed as soon as at least one parent succeeds, it does not wait for all parents to be done. But here you need to pay attention to synchronization of branches. If you have a workflow where all parents should be finished before starting the task, then this trigger rule would break the workflow.
Cutting the long story short, to make the workflow work correctly with branches as it is shown on the picture below, you need to set trigger rules to tasks which merge different branches.
And the code for this sample: