I knew TaskFlow existed. It's been in Airflow since 2.0, and I'd scrolled past it in the docs more than once. I just assumed it was decorator syntax layered over the same execution model, and I had actual pipelines to ship. So I kept writing PythonOperator tasks, wiring xcom_push and xcom_pull by hand between them, and calling it fine.
That was about a year of production work before I finally refactored something into TaskFlow and understood what I'd been missing.
The Part Traditional Operators Make You Do Yourself
The classic pattern goes like this. You define a Python callable, wrap it in a PythonOperator, and when one task needs to pass data to the next, you reach for XCom. XCom, if you're newer to Airflow, is how tasks store small values in the metadata database so a downstream task can retrieve them later. The upstream task calls ti.xcom_push(key='data', value=something) and the downstream task calls ti.xcom_pull(task_ids='upstream_task_name', key='data'), where you write both strings yourself, by hand, every time.
def extract(**kwargs):
data = [1, 2, 3]
kwargs['ti'].xcom_push(key='raw_data', value=data)
def transform(**kwargs):
data = kwargs['ti'].xcom_pull(task_ids='extract', key='raw_data')
return [x * 2 for x in data]
The failure mode is quiet. Get the task ID slightly wrong, forget to update a key string after renaming a task, and you get None downstream with no error at parse time, no warning in the logs. Just None showing up somewhere two steps later in a task that expected a list. I've wasted more debugging time on that specific bug class than I want to admit.
And beyond the debugging, there's just the volume of it. In a DAG with six or eight tasks, you're writing xcom_push and xcom_pull pairs constantly. None of it has anything to do with the logic you're trying to express. It's plumbing Airflow makes you wire yourself, and it accumulates fast.
What TaskFlow Changes
TaskFlow replaces that wiring with two decorators. You put @task on each function, and when one task's return value gets passed as an argument to the next, Airflow creates the XCom push and pull automatically. The @dag decorator wraps the pipeline itself, and you instantiate it at the bottom of the file. That's it.
from airflow.decorators import dag, task
from datetime import datetime
@dag(start_date=datetime(2024, 1, 1), schedule='@daily', catchup=False)
def my_pipeline():
@task
def extract():
return [1, 2, 3]
@task
def transform(data):
return [x * 2 for x in data]
transform(extract())
my_pipeline()
The execution engine underneath is the same and XCom is still happening, but you're not managing it — no key strings to mistype, no task_ids arguments to keep in sync with task names. The data just flows.
What I didn't expect was how much easier TaskFlow functions are to unit test. Call transform([1, 2, 3]) directly in a test and it works like any Python function. Try that with a callable that expects ti in **kwargs and you're mocking Airflow internals before you've tested a single line of business logic.
The mental model shifts too. With traditional operators, I was always thinking about wiring: task A pushes to XCom with this key, task B pulls from task A using that key, and I have to keep all of that consistent across the file. With TaskFlow, I think about data moving through functions, which is actually how the pipeline works.
The Gas Price Pipeline That Made It Concrete
I had a small job running in production: pull daily gas price data from a public API, clean it up, and load it into PostgreSQL for a downstream dashboard. Three tasks, nothing complicated. I'd originally built it with the traditional approach, and it had been sitting there working fine for months.
The fetch task had this in it:
json_data = df.to_json(orient='records')
kwargs['ti'].xcom_push(key='raw_gas_data', value=json_data)
That to_json call wasn't there because the transform step needed JSON. It was there because you can't reliably push a Pandas DataFrame through XCom — it won't serialize cleanly. So I was converting to JSON in fetch, pushing the string, then immediately parsing it back into a DataFrame at the top of transform. The whole round trip existed to satisfy Airflow's plumbing, not anything related to the pipeline's purpose.
After the refactor:
@task
def fetch_gas_prices():
return decoded_data # list or dict, serializes fine
@task
def transform_gas_prices(raw_data):
# work directly with raw_data, no JSON parsing needed
return df.to_json(orient='records')
raw = fetch_gas_prices()
cleaned = transform_gas_prices(raw)
store_gas_prices(cleaned)
The logic inside each function barely changed. What disappeared was the JSON round trip, all the xcom_push and xcom_pull calls, and the **kwargs threading through everything. The code now describes the pipeline instead of describing how to configure Airflow to run it. That might sound like a small thing until you try to onboard someone to a DAG written the old way and watch them spend twenty minutes tracing XCom keys through four functions.
XCom Size Limits Don't Disappear With TaskFlow
TaskFlow can mislead you on this. Because passing data between tasks now looks like passing a variable between two Python functions, it starts to feel like it's free. It isn't.
Every value a TaskFlow task returns still gets serialized and written to Airflow's metadata database as an XCom. Exactly the same as before, just automatic. Return a DataFrame with a few million rows from your extract task and that entire object gets pickled into a database that was never designed to hold payloads that size. I've seen this go wrong slowly: the metadata database bloats over weeks, and eventually DAG parsing and scheduling starts degrading across the whole environment, not just the one DAG responsible. By the time you notice, it's not obvious what caused it.
The fix is to pass references once your data gets beyond trivially small. Write the DataFrame to S3 or a staging table in the extract task, return the file path or table name, and have the next task read from storage. A rough rule: if the value wouldn't fit comfortably in a Slack message, it probably doesn't belong in an XCom. That constraint was always there with the traditional approach — TaskFlow just makes it easier to forget.
The Fastest Way to Understand It
Pick a DAG you've already written in the traditional style — something with three or four tasks where you can see the whole thing at once — and refactor it into TaskFlow. The point is to do it side by side with something you already understand, so the structural difference is obvious rather than buried under unfamiliar logic. Strip out the PythonOperator wrappers, add the decorators, replace the xcom_push and xcom_pull calls with return values and function arguments. The business logic inside each task barely moves. What changes is whether the file reads like a data pipeline or like a set of Airflow instructions that happen to contain one.
Most people who do this refactor want to go back and do it to everything else they've written. That's probably the clearest signal that it was worth understanding.
Top comments (0)