Event-driven scheduling is the single largest reliability improvement in Airflow 3. Assets replace ExternalTaskSensor; AssetWatchers replace the poke-every-30-seconds sensor pattern. This guide walks through the practical recipes for both.
When to go event-driven
Time-based schedules (@hourly, @daily, cron) are still the right choice for DAGs whose work has a natural cadence independent of upstream. A report run at 8 AM local is a schedule, not an event.
Reach for events when:
- The work should run as soon as upstream is ready, not on the next cron tick.
- The upstream producer does not run on the same clock you do.
- You are currently using a sensor or
ExternalTaskSensorto wait.
Pattern 1: Asset for DAG-to-DAG
The canonical cross-DAG coordination. DAG A writes an Asset; DAG B schedules on it.
Producer DAG
from airflow.assets import Asset
orders_gold = Asset("s3://causeway-prod/gold/fct_orders/_delta_log/")
with DAG(
dag_id="build_gold_orders",
schedule="@hourly",
...
) as dag_a:
transform = DatabricksRunNowOperator(...)
@task(outlets=[orders_gold])
def publish_signal(_):
"""Publishing this Asset triggers downstream consumers."""
transform >> publish_signal(transform.output)
The outlets=[orders_gold] on the final task says "when this task succeeds, update the Asset". Downstream consumers see the update.
Consumer DAG
with DAG(
dag_id="refresh_executive_dashboard",
schedule=[orders_gold], # triggers when orders_gold updates
catchup=False,
) as dag_b:
refresh = PowerBIDatasetRefreshOperator(...)
schedule=[orders_gold] is the trigger. No clock, no sensor, no ExternalTaskSensor tax.
Multi-asset AND / OR
# AND: trigger when both assets have updated since the last run
schedule=[asset_a, asset_b]
# OR: trigger when either updates
from airflow.assets import AssetAny
schedule=AssetAny(asset_a, asset_b)
# Complex: A AND (B OR C)
from airflow.assets import AssetAll, AssetAny
schedule=AssetAll(asset_a, AssetAny(asset_b, asset_c))
Tip
AND semantics (the default [asset_a, asset_b]) is almost always what you want for fan-in. "Refresh when both the orders and the customers tables are fresh" is AND. OR is rare; mostly useful for recovery-path DAGs that should trigger on whichever signal comes first.
Pattern 2: AssetWatcher for external triggers
When the upstream is outside Airflow (vendor SaaS, Kafka, S3 events, UC table updates), an AssetWatcher subscribes to the event source.
S3 arrival
from airflow.assets import AssetWatcher
from airflow.providers.amazon.aws.assets import S3AssetWatcher
new_orders = AssetWatcher(
name="new_orders_file",
source=S3AssetWatcher(
bucket_name="causeway-landing",
key_prefix="orders/",
),
)
with DAG(
dag_id="process_order_file",
schedule=new_orders,
catchup=False,
) as dag:
@task
def process_arrival(uri: str):
# uri from the event payload
...
S3 events flow through SNS or EventBridge to the watcher; no polling.
SQS queue
from airflow.providers.amazon.aws.assets import SqsAssetWatcher
task_queue = AssetWatcher(
name="work_queue",
source=SqsAssetWatcher(
queue_url="https://sqs.us-east-1.amazonaws.com/…/causeway-work",
max_messages=10,
wait_time_seconds=20,
),
)
with DAG(
dag_id="handle_work",
schedule=task_queue,
) as dag:
@task
def handle(event: dict):
...
Long-poll SQS; wake DAG on message arrival.
Unity Catalog table update
from airflow.providers.databricks.assets import UnityCatalogAssetWatcher
uc_watcher = AssetWatcher(
name="silver_customers_updated",
source=UnityCatalogAssetWatcher(
table="prod.silver.customers",
),
)
with DAG(
dag_id="rebuild_customer_360",
schedule=uc_watcher,
) as dag:
...
Pattern 3: Asset + AssetWatcher hybrid
Mix the two when an external event triggers an internal pipeline that then publishes Assets of its own.
# External arrival ──► Ingestion DAG ──► Asset update
# │
# ▼
# Consumer DAGs trigger
landing_file = AssetWatcher(name="landing", source=S3AssetWatcher(...))
silver_orders = Asset("s3://.../silver/orders/_delta_log/")
with DAG(dag_id="ingest_orders", schedule=landing_file) as ingest:
...
@task(outlets=[silver_orders])
def publish(_): pass
with DAG(dag_id="build_gold_orders", schedule=[silver_orders]) as gold:
...
Ingestion waits on the file arriving; the gold build waits on silver finishing; both are event-driven end-to-end.
The sensor migration
Replace synchronous sensors with these patterns:
| Old pattern | 2026 replacement |
|---|---|
ExternalTaskSensor waiting on another DAG | Asset published by the upstream, scheduled by the consumer |
S3KeySensor polling for a file | S3AssetWatcher subscribed to the bucket event |
HttpSensor polling an API | HTTP asset watcher (custom) or a deferrable sensor with short timeout |
TimeDeltaSensor waiting for a clock moment | Direct schedule on the clock |
SqlSensor polling a DB table | UC asset watcher or a deferrable SQL sensor |
Warning
A naive migration that keeps poke_interval=30 on a deferrable sensor is better than synchronous (releases the worker slot), but it still costs Triggerer cycles. For genuinely event-driven triggers, AssetWatchers are cheaper and more reliable than any sensor polling pattern.
The TriggerDagRunOperator trap
Pre-Airflow-2 code often used TriggerDagRunOperator for cross-DAG coordination:
# Legacy pattern; do not use for new DAGs
trigger_downstream = TriggerDagRunOperator(
task_id="trigger_refresh",
trigger_dag_id="refresh_dashboard",
)
Problems:
- Tight coupling. Producer knows the consumer's DAG ID.
- Breaks fan-in. Three producers triggering one consumer fire three separate runs; no aggregation.
- Hard to reason about from the consumer side. "What triggers this DAG?" requires searching the entire repo.
Assets invert the dependency: the consumer declares what it depends on; the producer declares what it publishes; the scheduler wires them up.
Important
For any new cross-DAG coordination, use Assets. Keep TriggerDagRunOperator only for legacy compatibility until you can refactor. The inversion makes the system easier to understand, easier to modify, and easier to fan in.
Inspecting Assets
# List all assets
airflow assets list
# Show consumers of an asset
airflow assets list-consumers s3://causeway-prod/gold/fct_orders/_delta_log/
# Show the last update time
airflow assets show s3://causeway-prod/gold/fct_orders/_delta_log/
# Manually fire an asset update (rare; usually you let producers do this)
airflow assets update --uri s3://... --extra '{"reason": "manual backfill"}'
Common mistakes
| Symptom | Root cause |
|---|---|
| Consumer DAG does not trigger | Producer never emits the Asset (missing outlets=). |
| Consumer triggers too often | Multiple upstream tasks emit the same Asset; narrow outlets to the final task. |
| AssetWatcher never fires | Event source misconfigured (wrong bucket, missing SNS topic, IAM permission). |
| Watcher fires but DAG does not run | Triggerer not deployed; async scheduling requires it. |
| Recovery runs produce duplicates | Consumer DAG not idempotent on Asset-triggered runs. Apply the same idempotency rules as clock-driven DAGs. |
See also
- Dependency types — choosing between direct, Asset, and watcher.
- DAG authoring guide — idempotency and atomicity apply to event-driven DAGs too.
- Airflow 3 changes — why Assets replaced Datasets.