The quickstart showed the mechanics. This guide shows the rules that separate a DAG you can reason about from one that pages you at 3am.

The three properties

Three properties separate good DAGs from bad ones: idempotency, atomicity, and small focus. Every rule in this guide comes back to one of them.

Idempotency

A DAG is idempotent if rerunning it with the same inputs produces the same outputs. This is non-negotiable. Every recovery story (backfill, replay, retry, partial failure) depends on it.

Practical rules:

Danger

Never use datetime.now() or "today" in task logic. It is the single most common idempotency bug. A backfill run for last Tuesday that writes with "today's" date silently corrupts the data warehouse. Always use logical_date; it is the one value stable across retries, backfills, and replays.

Atomicity

Each task does one thing. A task that "fetches from the API, transforms, writes to S3, updates the catalog, notifies Slack" has five ways to fail ambiguously. Split it.

The exception: operations that must happen together to make sense (authenticate + call the API). Keep those fused. The rule is "one unit of recoverable work per task", not "one line of code per task".

Small, focused DAGs

Prefer five 20-task DAGs over one 100-task DAG. Smaller DAGs:

When DAGs need to coordinate, use Assets to link them rather than building one mega-DAG.

A realistic hourly DAG

Build an hourly ingest from Salesforce to a silver table on Databricks. The shape is typical of Causeway pipelines.

from __future__ import annotations
import pendulum
from airflow import DAG
from airflow.decorators import task
from airflow.assets import Asset
from airflow.providers.databricks.operators.databricks import (
    DatabricksRunNowOperator,
)

silver_accounts = Asset(
    "s3://causeway-prod/silver/sf_accounts/_delta_log/"
)

with DAG(
    dag_id="sf_accounts_hourly",
    description="Hourly ingest: Salesforce accounts -> silver Delta",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    schedule="0 * * * *",
    catchup=False,
    max_active_runs=1,
    default_args={
        "owner": "data-engineering",
        "retries": 3,
        "retry_delay": pendulum.duration(minutes=5),
        "retry_exponential_backoff": True,
        "max_retry_delay": pendulum.duration(hours=1),
    },
    tags=["sf", "silver", "hourly"],
) as dag:

    # Fetch window identifiers from the logical date
    @task
    def window_for(logical_date: pendulum.DateTime) -> dict:
        return {
            "window_start": logical_date.to_iso8601_string(),
            "window_end": logical_date.add(hours=1).to_iso8601_string(),
            "partition_date": logical_date.format("YYYY-MM-DD"),
            "partition_hour": logical_date.hour,
        }

    window = window_for("{{ logical_date }}")

    # Trigger a DAB-deployed Databricks job for the heavy lifting
    extract_and_load = DatabricksRunNowOperator(
        task_id="extract_and_load",
        databricks_conn_id="databricks_default",
        job_id="{{ var.value.sf_accounts_ingest_job_id }}",
        notebook_params="{{ ti.xcom_pull(task_ids='window_for') }}",
        deferrable=True,
    )

    # Validate row counts before publishing the Asset
    @task
    def assert_row_counts(window: dict) -> None:
        from databricks.sql import connect
        with connect(...) as conn:
            count = conn.query(f"""
                SELECT COUNT(*)
                FROM prod.silver.sf_accounts
                WHERE ingested_at >= '{window["window_start"]}'
                  AND ingested_at <  '{window["window_end"]}'
            """).fetchone()[0]
        if count == 0:
            from airflow.exceptions import AirflowFailException
            raise AirflowFailException(
                f"Zero rows for window {window['window_start']}. Investigate."
            )

    # Publishing the Asset triggers downstream DAGs
    @task(outlets=[silver_accounts])
    def publish(_window: dict) -> None:
        pass

    window >> extract_and_load >> assert_row_counts(window) >> publish(window)

Five things in that DAG are load-bearing. Let's walk through them.

1. pendulum-timezoned start_date

start_date=pendulum.datetime(2026, 1, 1, tz="UTC")

Naive datetime(...) objects run in UTC regardless of what you think; pendulum timezone-aware datetimes honor the timezone (including DST transitions). Use pendulum. Schedule machine work in UTC; translate to local in reports.

2. catchup=False and max_active_runs=1

catchup=False,
max_active_runs=1,

catchup=False prevents the "DAG paused for a week dumps 168 runs" failure mode. max_active_runs=1 prevents two runs overlapping when the DAG is stateful (writes to logical_date-scoped partitions that would conflict on simultaneous writes).

3. default_args with real retry semantics

default_args={
    "retries": 3,
    "retry_delay": pendulum.duration(minutes=5),
    "retry_exponential_backoff": True,
    "max_retry_delay": pendulum.duration(hours=1),
}

Exponential backoff with a cap protects against thundering-herd retries when a downstream service hiccups. Without the cap, retries fire close together and all fail the same way; with it, the third retry gives the downstream system a full hour to recover.

Note

Retries are for transient failures, not for "the resource is not ready yet." Sensors handle "not ready"; pools handle "do not ask too often"; retries handle "tried once, got a 503, try again." Muddle these and you build flakiness.

4. logical_date passed through tasks

window = window_for("{{ logical_date }}")
# ...
extract_and_load = DatabricksRunNowOperator(
    notebook_params="{{ ti.xcom_pull(task_ids='window_for') }}",
    ...
)

logical_date is the start of the interval the run covers, not wall-clock time. It is stable across retries, backfills, and replays. Use it as the partition key for every write. Pass it through to external systems (Databricks, APIs) so they can scope their work.

5. Deferrable operator

extract_and_load = DatabricksRunNowOperator(
    ...
    deferrable=True,
)

Deferrable operators release the worker slot while the external work runs. A 45-minute Databricks job does not hold an Airflow worker for 45 minutes; the Triggerer process handles the async wait.

Warning

Deferrable operators require a Triggerer process to be running. If you turn on deferrable=True without deploying a Triggerer, tasks will fail silently when they try to defer. Astro deployments include the Triggerer by default; OSS installs must enable it explicitly.

6. AirflowFailException for unrecoverable errors

if count == 0:
    raise AirflowFailException("Zero rows...")

Regular exceptions retry per the retries config. AirflowFailException skips retries and fails the task immediately. Use it for unrecoverable conditions (bad input, unmet invariant); do not retry twelve times hoping it gets better.

7. Asset outlet as the publish signal

@task(outlets=[silver_accounts])
def publish(_window: dict) -> None:
    pass

Writing the Asset triggers any downstream DAG scheduled on it. Separating the write task from the publish task means you can assert invariants (assert_row_counts) before signaling downstream. A bad run never publishes.

Common mistakes

SymptomRoot cause
Duplicate rows after backfillWrites are blind INSERT. Change to MERGE / UPSERT.
Backfill wrote to wrong datesdatetime.now() or hard-coded today in task logic. Use logical_date.
Two runs clobbering each otherNo max_active_runs=1 on a stateful DAG.
DAG parses slowlyHeavy imports or API calls at DAG-file top level. Move into task callables.
Retries fire close together, all failNo retry_exponential_backoff or no max_retry_delay cap.
Worker slots exhaustedSynchronous sensors or non-deferrable long-running tasks. Switch to deferrable.

See also