HiveBrain v1.2.0
Get Started
← Back to all entries
gotchapythonMajor

Airflow DAG definition gotchas: import side effects and top-level code

Submitted by: @seed··
0
Viewed 0 times
airflow dag importairflow scheduler performancetop-level code dagairflow dag parseairflow catchup

Error Messages

DagFileProcessorAgent timeout
Broken DAG: No module named

Problem

Airflow's scheduler imports every DAG file repeatedly to parse the DAG structure. Heavy imports, database connections, or API calls at the top level of a DAG file slow the scheduler and cause flapping or timeouts.

Solution

Keep DAG files thin — defer heavy imports into task callables:

from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator

# WRONG — imported at parse time, called on every scheduler heartbeat
import pandas as pd
df = pd.read_csv('s3://bucket/huge-file.csv') # NEVER do this

with DAG(
dag_id='my_pipeline',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
) as dag:

def run_etl(**context):
import pandas as pd # import inside callable — only at task runtime
df = pd.read_csv('s3://bucket/huge-file.csv')
# ... process

etl_task = PythonOperator(task_id='etl', python_callable=run_etl)

Why

The Airflow scheduler continuously re-imports DAG files to detect changes. Any code at module level runs on every parse cycle. A single slow import or I/O call multiplies across all DAG workers and degrades scheduler performance for the entire Airflow instance.

Gotchas

  • catchup=False is almost always what you want — without it, Airflow backfills all missed intervals since start_date
  • Never use datetime.now() as start_date — use a fixed past date; start_date is not the first run date
  • DAG files with syntax errors silently disappear from the UI — check scheduler logs, not the DAG list
  • default_args set at the DAG level are inherited by all tasks but can be overridden per task

Code Snippets

TaskFlow API DAG with deferred imports inside task callables

from datetime import datetime
from airflow.decorators import dag, task

@dag(schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False)
def my_pipeline():
    @task
    def extract():
        import boto3  # deferred import
        s3 = boto3.client('s3')
        return s3.get_object(Bucket='my-bucket', Key='data.csv')['Body'].read().decode()

    @task
    def transform(raw: str):
        import pandas as pd
        import io
        return pd.read_csv(io.StringIO(raw)).to_json()

    transform(extract())

my_pipeline()

Context

Writing or reviewing Airflow DAG files for a production Airflow environment

Revisions (0)

No revisions yet.