Branching in Airflow workflows

With the growth of complexity of our Airflow DAGs, our workflows started to have multiple branches. Simple cases might be implemented with custom checks, more complex ones require utilizing the Airflow API. The latter has some not obvious behavior which I’m going to tell in this article and how to overcome them. I will use the following DAG as a sample for this discussion.

Workflow with branches

There are two ways of dealing with branching in Airflow DAGs: BranchPythonOperator and ShortCircuitOperator. You may find articles about usage of them and after that their work seems quite logical. For example, the article below covers both.

But when you start dealing with some more advanced workflows you may notice that they do not allow you to do what seems logical. For example, if you skip only one task, then you will find out that also were skipped tasks which were not supposed to be skipped. Those which are underlined with the blue line on the picture below should not be skipped when you skip the task t4 .

You wanted to skip only t4 task, but in fact also were skipped tasks which supposed to be executed

This “erroneous” situation happens when you use the operators mentioned above. If you try to use some internal methods, it won’t solve anything either. Neither #1 nor #2 from below would help.

  1. Task instance ti.set_state(…) method.
  2. Operator’s self.skip(...) method.

Why is this happen?

Airflow uses trigger rules for tasks to determine how tasks should be executed.

class TriggerRule:
"""Class with task's trigger rules."""

ALL_SUCCESS = 'all_success'
ALL_FAILED = 'all_failed'
ALL_DONE = 'all_done'
ONE_SUCCESS = 'one_success'
ONE_FAILED = 'one_failed'
NONE_FAILED = 'none_failed'
NONE_FAILED_OR_SKIPPED = 'none_failed_or_skipped'
NONE_SKIPPED = 'none_skipped'
DUMMY = 'dummy'

Trigger rules are an old trait of Airflow tasks, though I wasn’t able to find clear information in the documentation for the 2.x version. So below is the old documentation, but it’s still valid.

Default value for the task is all_success. It means that the task will run if all parents have succeeded. In our case this condition won’t be satisfied. This is why the task t7 in my example is not running. And the rest are not executed because their parents haven’t succeeded.

Fix of the branching logic

The fix is pretty simple: operators of the DAG with branches should set the trigger rule to one_success. Tasks with this rule will be executed as soon as at least one parent succeeds, it does not wait for all parents to be done. But here you need to pay attention to synchronization of branches. If you have a workflow where all parents should be finished before starting the task, then this trigger rule would break the workflow.

Cutting the long story short, to make the workflow work correctly with branches as it is shown on the picture below, you need to set trigger rules to tasks which merge different branches.

Correctly working workflow

And the code for this sample:

--

--

--

All opinions are my own || Software Developer, learner, perfectionist and entrepreneur-kind person, nonconformist. Always seeks for the order and completeness.

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Leet Code’s 30 Day Coding Challenge

Learn This One Secret To Write Cleaner Code In 2 Minutes

DevOps explained using ToC Logical Thinking Process

How Object Behave

Our Verified Attribute Prototype is Built

Session vs JWT (token based authentication)

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alexander Goida

Alexander Goida

All opinions are my own || Software Developer, learner, perfectionist and entrepreneur-kind person, nonconformist. Always seeks for the order and completeness.

More from Medium

How to install Airflow on Ubuntu 20.04

Testing of Apache Airflow’s DAGs with docker compose and pytest

Airflow Metadata: how-to gather key runtime statistics in real-time

Abstracting Data Loading with Airflow DAG Factories