The quickstart showed the mechanics. This guide shows the rules that separate a DAG you can reason about from one that pages you at 3am.
The three properties
Three properties separate good DAGs from bad ones: idempotency, atomicity, and small focus. Every rule in this guide comes back to one of them.
Idempotency
A DAG is idempotent if rerunning it with the same inputs produces the same outputs. This is non-negotiable. Every recovery story (backfill, replay, retry, partial failure) depends on it.
Practical rules:
- Writes are
MERGE,UPSERT, or "delete this partition then insert". Never blindINSERT. Two runs of the same logical date must produce the same table. - Writes scope to a logical window. An hourly DAG writes to exactly the hour it runs for, using
logical_dateas the partition key. - External side effects need idempotency keys. API POSTs, message publishes, file writes all carry a deterministic key derived from
dag_id + task_id + logical_date.
Danger
Never use datetime.now() or "today" in task logic. It is the single most common idempotency bug. A backfill run for last Tuesday that writes with "today's" date silently corrupts the data warehouse. Always use logical_date; it is the one value stable across retries, backfills, and replays.
Atomicity
Each task does one thing. A task that "fetches from the API, transforms, writes to S3, updates the catalog, notifies Slack" has five ways to fail ambiguously. Split it.
The exception: operations that must happen together to make sense (authenticate + call the API). Keep those fused. The rule is "one unit of recoverable work per task", not "one line of code per task".
Small, focused DAGs
Prefer five 20-task DAGs over one 100-task DAG. Smaller DAGs:
- Fail in isolation (one domain breaking does not block the rest).
- Deploy independently.
- Parse faster (the scheduler re-parses DAGs constantly).
- Read in the UI without scrolling.
When DAGs need to coordinate, use Assets to link them rather than building one mega-DAG.
A realistic hourly DAG
Build an hourly ingest from Salesforce to a silver table on Databricks. The shape is typical of Causeway pipelines.
from __future__ import annotations
import pendulum
from airflow import DAG
from airflow.decorators import task
from airflow.assets import Asset
from airflow.providers.databricks.operators.databricks import (
DatabricksRunNowOperator,
)
silver_accounts = Asset(
"s3://causeway-prod/silver/sf_accounts/_delta_log/"
)
with DAG(
dag_id="sf_accounts_hourly",
description="Hourly ingest: Salesforce accounts -> silver Delta",
start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
schedule="0 * * * *",
catchup=False,
max_active_runs=1,
default_args={
"owner": "data-engineering",
"retries": 3,
"retry_delay": pendulum.duration(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": pendulum.duration(hours=1),
},
tags=["sf", "silver", "hourly"],
) as dag:
# Fetch window identifiers from the logical date
@task
def window_for(logical_date: pendulum.DateTime) -> dict:
return {
"window_start": logical_date.to_iso8601_string(),
"window_end": logical_date.add(hours=1).to_iso8601_string(),
"partition_date": logical_date.format("YYYY-MM-DD"),
"partition_hour": logical_date.hour,
}
window = window_for("{{ logical_date }}")
# Trigger a DAB-deployed Databricks job for the heavy lifting
extract_and_load = DatabricksRunNowOperator(
task_id="extract_and_load",
databricks_conn_id="databricks_default",
job_id="{{ var.value.sf_accounts_ingest_job_id }}",
notebook_params="{{ ti.xcom_pull(task_ids='window_for') }}",
deferrable=True,
)
# Validate row counts before publishing the Asset
@task
def assert_row_counts(window: dict) -> None:
from databricks.sql import connect
with connect(...) as conn:
count = conn.query(f"""
SELECT COUNT(*)
FROM prod.silver.sf_accounts
WHERE ingested_at >= '{window["window_start"]}'
AND ingested_at < '{window["window_end"]}'
""").fetchone()[0]
if count == 0:
from airflow.exceptions import AirflowFailException
raise AirflowFailException(
f"Zero rows for window {window['window_start']}. Investigate."
)
# Publishing the Asset triggers downstream DAGs
@task(outlets=[silver_accounts])
def publish(_window: dict) -> None:
pass
window >> extract_and_load >> assert_row_counts(window) >> publish(window)
Five things in that DAG are load-bearing. Let's walk through them.
1. pendulum-timezoned start_date
start_date=pendulum.datetime(2026, 1, 1, tz="UTC")
Naive datetime(...) objects run in UTC regardless of what you think; pendulum timezone-aware datetimes honor the timezone (including DST transitions). Use pendulum. Schedule machine work in UTC; translate to local in reports.
2. catchup=False and max_active_runs=1
catchup=False,
max_active_runs=1,
catchup=False prevents the "DAG paused for a week dumps 168 runs" failure mode. max_active_runs=1 prevents two runs overlapping when the DAG is stateful (writes to logical_date-scoped partitions that would conflict on simultaneous writes).
3. default_args with real retry semantics
default_args={
"retries": 3,
"retry_delay": pendulum.duration(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": pendulum.duration(hours=1),
}
Exponential backoff with a cap protects against thundering-herd retries when a downstream service hiccups. Without the cap, retries fire close together and all fail the same way; with it, the third retry gives the downstream system a full hour to recover.
Note
Retries are for transient failures, not for "the resource is not ready yet." Sensors handle "not ready"; pools handle "do not ask too often"; retries handle "tried once, got a 503, try again." Muddle these and you build flakiness.
4. logical_date passed through tasks
window = window_for("{{ logical_date }}")
# ...
extract_and_load = DatabricksRunNowOperator(
notebook_params="{{ ti.xcom_pull(task_ids='window_for') }}",
...
)
logical_date is the start of the interval the run covers, not wall-clock time. It is stable across retries, backfills, and replays. Use it as the partition key for every write. Pass it through to external systems (Databricks, APIs) so they can scope their work.
5. Deferrable operator
extract_and_load = DatabricksRunNowOperator(
...
deferrable=True,
)
Deferrable operators release the worker slot while the external work runs. A 45-minute Databricks job does not hold an Airflow worker for 45 minutes; the Triggerer process handles the async wait.
Warning
Deferrable operators require a Triggerer process to be running. If you turn on deferrable=True without deploying a Triggerer, tasks will fail silently when they try to defer. Astro deployments include the Triggerer by default; OSS installs must enable it explicitly.
6. AirflowFailException for unrecoverable errors
if count == 0:
raise AirflowFailException("Zero rows...")
Regular exceptions retry per the retries config. AirflowFailException skips retries and fails the task immediately. Use it for unrecoverable conditions (bad input, unmet invariant); do not retry twelve times hoping it gets better.
7. Asset outlet as the publish signal
@task(outlets=[silver_accounts])
def publish(_window: dict) -> None:
pass
Writing the Asset triggers any downstream DAG scheduled on it. Separating the write task from the publish task means you can assert invariants (assert_row_counts) before signaling downstream. A bad run never publishes.
Common mistakes
| Symptom | Root cause |
|---|---|
| Duplicate rows after backfill | Writes are blind INSERT. Change to MERGE / UPSERT. |
| Backfill wrote to wrong dates | datetime.now() or hard-coded today in task logic. Use logical_date. |
| Two runs clobbering each other | No max_active_runs=1 on a stateful DAG. |
| DAG parses slowly | Heavy imports or API calls at DAG-file top level. Move into task callables. |
| Retries fire close together, all fail | No retry_exponential_backoff or no max_retry_delay cap. |
| Worker slots exhausted | Synchronous sensors or non-deferrable long-running tasks. Switch to deferrable. |
See also
- Dependency types — Assets, AssetWatchers, and direct dependencies.
- Error recovery — retries, pools, callbacks, poison-message handling.
- DAG authoring standards — the enforced rule set.