Lakeflow Declarative Pipelines (LDP), formerly Delta Live Tables, are the right tool when you want Databricks to own the incrementalization, streaming, and data-quality enforcement for a transformation. You declare tables and expectations; the framework handles the plumbing.
This guide builds a realistic bronze-to-silver pipeline with expectations, CDC, and the two configuration choices that catch teams out.
When LDP is the right tool
Before you build one, confirm LDP is the right choice.
Good fit:
- Streaming or near-real-time silver tables with quality gates.
- CDC ingestion (SCD-1 or SCD-2) from a change feed.
- Pipelines where you want automatic backfill, retry, and lineage.
Bad fit:
- Batch SQL transformations where dbt is already the team's idiom.
- Pipelines that need complex Jinja logic or macro libraries.
- One-shot transformations that do not benefit from incremental processing.
If LDP is not a clean fit, see the dbt on Databricks quickstart for the alternative.
1. Write the pipeline
Create pipelines/silver_customers.py:
import dlt
from pyspark.sql.functions import col, trim
# Raw streaming source — one row per CDC event from Lakeflow Connect
@dlt.table(
name="bronze_customer_events",
comment="Raw Salesforce customer CDC events, append-only stream.",
tblproperties={"quality": "bronze"},
)
def bronze_customer_events():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("s3://causeway-bronze/salesforce/account/")
)
# Silver streaming table — cleaned, deduplicated, with quality gates
@dlt.table(
name="silver_customers",
comment="One row per customer, SCD-1 from Salesforce CDC.",
tblproperties={"quality": "silver"},
)
@dlt.expect_or_drop("valid_customer_id", "customer_id IS NOT NULL")
@dlt.expect("reasonable_ltv", "ltv_usd BETWEEN 0 AND 1000000")
@dlt.expect_or_fail("valid_email", "email LIKE '%@%'")
def silver_customers():
return (
dlt.read_stream("bronze_customer_events")
.select(
col("customer_id").cast("bigint").alias("customer_id"),
trim(col("email")).alias("email"),
col("ltv_usd").cast("decimal(12, 2)"),
col("_change_timestamp").alias("updated_at"),
)
)
Three things are worth noticing:
@dlt.tabledecorator declares a table. LDP infers the name, creates it in Unity Catalog, and wires lineage.- Expectations (
@dlt.expect,@dlt.expect_or_drop,@dlt.expect_or_fail) are first-class. They log, drop, or halt depending on severity. dlt.read_stream(...)references another table in this pipeline by name. Cross-pipeline refs usespark.readStream.table("...").
2. Configure the pipeline
Declare the pipeline in your DAB:
# resources/pipelines/silver_customers.pipeline.yml
resources:
pipelines:
silver_customers:
name: silver_customers_${bundle.target}
catalog: ${var.catalog}
target: silver
serverless: true
photon: true
continuous: false
channel: CURRENT
development: ${bundle.target == "dev"}
configuration:
source_path: s3://causeway-${bundle.target}-bronze/salesforce/
libraries:
- file:
path: ../../pipelines/silver_customers.py
Three settings determine behavior.
continuous: false (triggered)
Triggered pipelines run once, process new data, stop. Use this for batch and near-batch workloads. Orchestrate with Lakeflow Jobs or Airflow.
Continuous pipelines run indefinitely, processing data as it arrives. Use when latency matters and the source is truly streaming. Expect to pay for compute continuously, even when no data flows.
Warning
Avoid continuous unless you genuinely need streaming semantics. Many teams reach for continuous: true because "this is a streaming pipeline", then pay for idle compute 80% of the time. Triggered on a 5-minute schedule is often the right answer.
development: true (dev) vs development: false (prod)
| Setting | Dev (true) | Prod (false) |
|---|---|---|
| Cluster reuse | Yes; faster iteration | No; fresh cluster each run |
Expectations that fail | Logged only | Stop the pipeline |
| Table retention on code change | May be recreated | Persistent |
| Use case | Iterating on code | Stable execution |
The snippet above uses ${bundle.target == "dev"} so dev targets get dev behavior and staging/prod do not.
channel: CURRENT
CURRENT gets stable features. PREVIEW opts you into the next release early.
Causeway production pipelines run CURRENT. Preview channel is fine in dev for evaluating upcoming features; do not promote PREVIEW into production.
3. Expectations
Three severities:
# WARN — log violations, keep all rows
@dlt.expect("valid_email", "email IS NOT NULL AND email LIKE '%@%'")
# DROP — remove violating rows from the output
@dlt.expect_or_drop("valid_amount", "amount > 0")
# FAIL — halt the pipeline on any violation
@dlt.expect_or_fail("valid_customer_id", "customer_id IS NOT NULL")
Or grouped:
@dlt.expect_all({
"valid_email": "email IS NOT NULL",
"valid_amount": "amount > 0",
"valid_date": "transaction_date <= CURRENT_DATE()",
})
@dlt.expect_all_or_drop({
"not_null_id": "id IS NOT NULL",
"positive_quantity": "quantity > 0",
})
Rules of thumb:
expect(warn) for metrics you want to track over time: how often a condition is violated, what the trend is.expect_or_dropfor rows that should not flow downstream but should not halt the pipeline. Data loss; monitor the drop count.expect_or_failfor invariants: violating this means something is fundamentally wrong and the pipeline should stop.
Danger
expect_or_fail halts the pipeline and blocks every downstream table until resolved. Use it only for conditions that represent "this data cannot possibly be right". A typical signature: unique keys should be unique; primary IDs should not be null; currencies should be in the expected set.
4. CDC with AUTO CDC
For CDC sources, use the AUTO CDC flow (which replaced APPLY CHANGES INTO in 2026):
import dlt
@dlt.table(name="silver_customers", tblproperties={"quality": "silver"})
def silver_customers():
return dlt.read("bronze_customer_events")
# Apply CDC changes from bronze → silver
dlt.create_auto_cdc_flow(
target="silver_customers",
source="bronze_customer_events",
keys=["customer_id"],
sequence_by="_change_timestamp",
stored_as_scd_type=1, # SCD-1: overwrite; use 2 for SCD-2 history
apply_as_deletes="_change_type = 'DELETE'",
apply_as_truncates="_change_type = 'TRUNCATE'",
)
Three configuration choices:
keys: the primary key of the target table.sequence_by: the column whose monotonicity determines "latest" (a CDC timestamp or LSN).stored_as_scd_type:1(overwrite on change) or2(append a history row per change).
5. Deploy and run
databricks bundle deploy --target dev
databricks bundle run silver_customers --target dev
The run URL lands you in the Databricks UI, where you see:
- A DAG of the tables the pipeline produces.
- Row counts per run.
- Expectation pass/fail/drop counts per expectation per run.
- The event log for the run.
6. Monitor via the event log
LDP writes a structured event log you can query:
-- All errors in the last 24 hours
SELECT timestamp, event_type, message, details
FROM event_log(TABLE(prod.silver.silver_customers))
WHERE level = 'ERROR'
AND timestamp > CURRENT_TIMESTAMP() - INTERVAL 24 HOURS
ORDER BY timestamp DESC;
-- Expectation violations trend
SELECT
DATE(timestamp) AS date,
SUM(details:flow_progress:data_quality:dropped_records) AS dropped
FROM event_log(TABLE(prod.silver.silver_customers))
WHERE event_type = 'flow_progress'
GROUP BY DATE(timestamp)
ORDER BY date DESC;
Ship these queries to your observability stack (Grafana, Elementary, or the Causeway-internal dashboard). Rising drop counts are the early signal that a source changed.
7. Full refresh
Sometimes you need to blow a pipeline away and rebuild:
- Schema change that LDP cannot auto-evolve (column type change).
- Bug in an expectation that let bad rows through.
- Source data corrected for historical dates.
databricks bundle run silver_customers --target prod --full-refresh
Warning
Full refresh rewrites every table in the pipeline. On large sources this is expensive. Consider --full-refresh --refresh-tables silver_customers to target only the affected table, or replace_where-style targeted backfill on the specific slice.
Common mistakes
| Symptom | Root cause |
|---|---|
| Pipeline runs succeed but tables are empty | dlt.read_stream returns no rows on the first run because the source has not produced new data since the checkpoint. Wait, or use dlt.read for batch mode. |
StreamingQueryException: Error reading checkpoint | Checkpoint corrupted (rare). Full-refresh the affected streaming table. |
| Pipeline runs in continuous mode despite config | continuous: false at pipeline level but the notebook sets trigger: continuous. Pipeline config wins; check both. |
| Expectations silently ignored | Development mode with development: true logs failures instead of enforcing them. Expected during dev; fatal in prod. |
AUTO CDC produces duplicate rows | sequence_by column is not monotonic in the source. Use a real CDC sequence, not current_timestamp(). |
See also
- Lakeflow concepts — when LDP is the right tool vs dbt.
- Unity Catalog — where the pipeline's tables land.
- Asset Bundles — deploying the pipeline alongside its configuration.