gotchapythonMajor
Airflow DAG definition gotchas: import side effects and top-level code
Viewed 0 times
airflow dag importairflow scheduler performancetop-level code dagairflow dag parseairflow catchup
Error Messages
Problem
Airflow's scheduler imports every DAG file repeatedly to parse the DAG structure. Heavy imports, database connections, or API calls at the top level of a DAG file slow the scheduler and cause flapping or timeouts.
Solution
Keep DAG files thin — defer heavy imports into task callables:
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
# WRONG — imported at parse time, called on every scheduler heartbeat
import pandas as pd
df = pd.read_csv('s3://bucket/huge-file.csv') # NEVER do this
with DAG(
dag_id='my_pipeline',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
) as dag:
def run_etl(**context):
import pandas as pd # import inside callable — only at task runtime
df = pd.read_csv('s3://bucket/huge-file.csv')
# ... process
etl_task = PythonOperator(task_id='etl', python_callable=run_etl)
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
# WRONG — imported at parse time, called on every scheduler heartbeat
import pandas as pd
df = pd.read_csv('s3://bucket/huge-file.csv') # NEVER do this
with DAG(
dag_id='my_pipeline',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
) as dag:
def run_etl(**context):
import pandas as pd # import inside callable — only at task runtime
df = pd.read_csv('s3://bucket/huge-file.csv')
# ... process
etl_task = PythonOperator(task_id='etl', python_callable=run_etl)
Why
The Airflow scheduler continuously re-imports DAG files to detect changes. Any code at module level runs on every parse cycle. A single slow import or I/O call multiplies across all DAG workers and degrades scheduler performance for the entire Airflow instance.
Gotchas
- catchup=False is almost always what you want — without it, Airflow backfills all missed intervals since start_date
- Never use datetime.now() as start_date — use a fixed past date; start_date is not the first run date
- DAG files with syntax errors silently disappear from the UI — check scheduler logs, not the DAG list
- default_args set at the DAG level are inherited by all tasks but can be overridden per task
Code Snippets
TaskFlow API DAG with deferred imports inside task callables
from datetime import datetime
from airflow.decorators import dag, task
@dag(schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False)
def my_pipeline():
@task
def extract():
import boto3 # deferred import
s3 = boto3.client('s3')
return s3.get_object(Bucket='my-bucket', Key='data.csv')['Body'].read().decode()
@task
def transform(raw: str):
import pandas as pd
import io
return pd.read_csv(io.StringIO(raw)).to_json()
transform(extract())
my_pipeline()Context
Writing or reviewing Airflow DAG files for a production Airflow environment
Revisions (0)
No revisions yet.