Branching in Airflow workflows

How to avoid cascadian skip of tasks

With the growth of complexity of our Airflow DAGs, our workflows started to have multiple branches. Simple cases might be implemented with custom checks, more complex ones require utilizing the Airflow API. The latter has some not obvious behavior which I’m going to tell in this article and how to overcome them. I will use the following DAG as a sample for this discussion.

There are two ways of dealing with branching in Airflow DAGs: BranchPythonOperator and ShortCircuitOperator. You may find articles about usage of them and after that their work seems quite logical. For example, the article below covers both.

But when you start dealing with some more advanced workflows you may notice that they do not allow you to do what seems logical. For example, if you skip only one task, then you will find out that also were skipped tasks which were not supposed to be skipped. Those which are underlined with the blue line on the picture below should not be skipped when you skip the task t4 .

This “erroneous” situation happens when you use the operators mentioned above. If you try to use some internal methods, it won’t solve anything either. Neither #1 nor #2 from below would help.

  1. Task instance ti.set_state(…) method.
  2. Operator’s self.skip(...) method.

Why is this happen?

Airflow uses trigger rules for tasks to determine how tasks should be executed.

Trigger rules are an old trait of Airflow tasks, though I wasn’t able to find clear information in the documentation for the 2.x version. So below is the old documentation, but it’s still valid.

Default value for the task is all_success. It means that the task will run if all parents have succeeded. In our case this condition won’t be satisfied. This is why the task t7 in my example is not running. And the rest are not executed because their parents haven’t succeeded.

Fix of the branching logic

The fix is pretty simple: operators of the DAG with branches should set the trigger rule to one_success. Tasks with this rule will be executed as soon as at least one parent succeeds, it does not wait for all parents to be done. But here you need to pay attention to synchronization of branches. If you have a workflow where all parents should be finished before starting the task, then this trigger rule would break the workflow.

Cutting the long story short, to make the workflow work correctly with branches as it is shown on the picture below, you need to set trigger rules to tasks which merge different branches.

And the code for this sample:

All opinions are my own || Software Developer, learner, perfectionist and entrepreneur-kind person, nonconformist. Always seeks for the order and completeness.