Airflow handles success easily; the interesting work is what happens when something fails. This guide is the pragmatic playbook for the four mechanisms you have: retries, pools, sensors, and callbacks.
Start here: the distinction
The most common Airflow reliability bug is muddling these four mechanisms.
| Mechanism | For |
|---|---|
| Retries | Transient failures (network blip, 503, timeout) |
| Pools | Gating concurrency against a rate-limited external resource |
| Sensors | Waiting for something to become ready |
| Callbacks | Emitting signals (alerts, audits) on state transitions |
Using retries to wait for something to be ready produces flakiness. Using a sensor for a transient failure starves workers. Using a pool for transient errors does nothing useful. Get the frame right.
Retries
Every task should have retries configured. The default (no retries) is the wrong default.
default_args = {
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(hours=1),
}
Exponential backoff
Without exponential backoff, three retries fire 15 seconds apart and all fail the same way. With it, retries space out (5 min → 10 min → 20 min), giving the downstream system room to recover.
max_retry_delay caps the backoff. Without a cap, a task that retries five times could wait eight hours between the last two attempts, which usually is not what you want.
When to skip retries: AirflowFailException
from airflow.exceptions import AirflowFailException
@task
def validate(payload: dict):
if not payload["schema_version"] in {"v1", "v2"}:
raise AirflowFailException(
f"Unsupported schema {payload['schema_version']}. "
"Cannot proceed; not retryable."
)
AirflowFailException fails the task immediately and skips remaining retries. Use for:
- Bad input that will not change on retry.
- Unmet invariants (row counts, schema mismatches, null where not-null was required).
- Permission or configuration issues that need human intervention.
Danger
Do not retry a task twelve times hoping it will eventually work. If the error is deterministic, retries cost time and mask the real signal. AirflowFailException surfaces the problem immediately; the on-call engineer sees the real error instead of "retry 12/12 failed".
Pools
Pools gate concurrency across DAGs, against a shared external resource. They are the right answer for:
- Rate-limited APIs. Salesforce rate-limits at 5 concurrent requests; create a pool of 5.
- Warehouse saturation. Do not let every DAG hammer a single SQL warehouse.
- Limited licenses or test environments. Only 3 QA environments available; pool of 3.
from airflow.models import Pool
# Define once (via UI, CLI, or a seeding DAG)
Pool.create_or_update_pool(
name="salesforce_api",
slots=5,
description="Max concurrent Salesforce API calls across all DAGs",
)
# Use per-task
extract = PythonOperator(
task_id="extract_from_sf",
python_callable=fetch,
pool="salesforce_api",
pool_slots=1, # default; heavy tasks can claim multiple slots
)
Every task that touches the rate-limited resource references the pool. Airflow queues tasks when no slots are free; it does not reject them.
Pools vs retries
| Situation | Use |
|---|---|
| API returned 429 rate limit on this specific call | Retry with backoff |
| API can handle at most N concurrent calls at any time | Pool with N slots |
| Both | Pool + retry; the pool limits concurrent, retries handle the one-off 429s |
Sensors
Sensors wait for something to become ready: a file to appear, an external task to finish, a database row to exist. They are the right answer for "not ready yet".
The async rule
The most important rule in 2026:
Warning
Never use a synchronous sensor. The old PokeUntilTrue pattern held a worker slot the entire time it waited. At scale, sensors starved real work. Use deferrable (async) variants: *Async operators or deferrable=True. They free the worker while waiting; the Triggerer process handles the async event loop.
# Wrong: synchronous; holds a worker slot until resolved
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
wait = S3KeySensor(
task_id="wait_for_file",
bucket_key="orders/2026-04-21.parquet",
bucket_name="landing-zone",
poke_interval=30,
timeout=3600,
)
# Right: deferrable; releases worker slot via Triggerer
wait = S3KeySensor(
task_id="wait_for_file",
bucket_key="orders/2026-04-21.parquet",
bucket_name="landing-zone",
deferrable=True, # <— the difference
poke_interval=30,
timeout=3600,
)
Timeouts always
A sensor with no timeout waits forever. A sensor with a 10-year timeout is effectively the same thing. A reasonable default is 2 × expected upstream latency:
- Upstream file usually lands within 15 minutes → sensor timeout 30 minutes.
- Upstream DAG usually finishes within 2 hours → sensor timeout 4 hours.
If the timeout fires, the failure is visible; someone investigates; the real problem gets fixed. Without a timeout, the sensor silently hogs a slot indefinitely.
Sensors vs AssetWatchers
When the trigger is truly external (file arriving, event on a queue, table update), prefer an AssetWatcher over a sensor. A watcher subscribes to events; a sensor polls. Watchers are more reliable, cheaper, and lower latency.
Keep sensors for:
- Short, bounded waits where the polling cost is negligible.
- Polling for conditions that do not have an event source (legacy systems, specific API state checks).
Callbacks
Callbacks fire on state transitions. Airflow supports on_failure_callback, on_retry_callback, on_success_callback, and on_skipped_callback.
Failure callbacks: replace email
SLAs were removed in Airflow 3; the replacement is on_failure_callback. Wire it to real alerting (PagerDuty, OpsGenie, Slack), not email.
def alert_on_failure(context):
from airflow.models import TaskInstance
ti: TaskInstance = context["ti"]
send_pagerduty_alert(
title=f"{ti.dag_id}/{ti.task_id} failed",
body=(
f"Run: {ti.run_id}\n"
f"Logical date: {ti.logical_date}\n"
f"Log URL: {ti.log_url}"
),
severity="warning",
)
default_args = {
"on_failure_callback": alert_on_failure,
...
}
Important
Email is the beginning of alert fatigue. Route callbacks to PagerDuty (for pageable incidents), OpsGenie (for on-call routing), and Slack (for team visibility). Email alerts should go to a summary dashboard, not to individual engineers.
Retry callbacks
on_retry_callback fires when a task is about to retry. Useful for signaling "degraded but recovering":
def log_retry(context):
ti = context["ti"]
metric("airflow.task.retry", tags={"dag": ti.dag_id, "task": ti.task_id})
Avoid on_success_callback for noisy work
Every successful task fires this callback. It runs inside the worker after the task body. A Slack post on every success produces alert fatigue within a week; reserve on_success_callback for observability (metric emission, lineage events), not notifications.
Concurrency gotchas
Five concurrency knobs interact. Understand all of them before tuning one.
parallelism(cluster-wide) — upper bound for the whole Airflow cluster.max_active_tasks_per_dag— how many tasks of one DAG can run simultaneously.max_active_runs_per_dag— how many runs of one DAG can be live in parallel. Set to 1 for stateful DAGs.max_active_tis_per_dagrun— concurrency of a single task within one run (matters for dynamically mapped tasks).- Pools — cross-DAG gates for external resources.
Tuning in the wrong place is the classic trap. If Salesforce API is rate-limited, that is a pool concern, not a max_active_tasks_per_dag concern. If one DAG with overlapping runs corrupts its writes, that is max_active_runs=1, not a retry configuration.
See concurrency reference for the full matrix.
A decision tree
Walk these when a task fails:
- Is the error deterministic (bad input, schema mismatch, missing permission)? →
AirflowFailException; fix the root cause. - Is the error transient (network blip, 503, timeout)? → retry with exponential backoff.
- Is the external resource rate-limited? → pool + retries for the one-offs.
- Is the task waiting for something outside Airflow's control? → deferrable sensor with a timeout, or an AssetWatcher.
- Did the task succeed but produce bad data? → row-count assertion +
AirflowFailException; callbacks emit the signal.
Common mistakes
| Symptom | Root cause |
|---|---|
| Same error every retry, all fail | Retries used for deterministic failure; should be AirflowFailException. |
| Worker pool exhausted during a peak | Sync sensors holding slots; switch to deferrable. |
| Salesforce rate-limits the whole team | No pool; every DAG hits the API independently. |
| Alerts drowning the channel | on_success_callback posting to Slack on every task. |
| Retries fire close together, all 429 | No retry_exponential_backoff; add it. |
| Sensor waits forever | No timeout; set one at ~2x expected upstream latency. |
See also
- DAG authoring guide — where these mechanisms appear in a real DAG.
- Failure triage — the on-call procedure after a failure.
- Concurrency reference — all six concurrency knobs.