Lakeflow Declarative Pipelines (LDP), formerly Delta Live Tables, are the right tool when you want Databricks to own the incrementalization, streaming, and data-quality enforcement for a transformation. You declare tables and expectations; the framework handles the plumbing.

This guide builds a realistic bronze-to-silver pipeline with expectations, CDC, and the two configuration choices that catch teams out.

When LDP is the right tool

Before you build one, confirm LDP is the right choice.

Good fit:

Bad fit:

If LDP is not a clean fit, see the dbt on Databricks quickstart for the alternative.

1. Write the pipeline

Create pipelines/silver_customers.py:

import dlt
from pyspark.sql.functions import col, trim

# Raw streaming source — one row per CDC event from Lakeflow Connect
@dlt.table(
    name="bronze_customer_events",
    comment="Raw Salesforce customer CDC events, append-only stream.",
    tblproperties={"quality": "bronze"},
)
def bronze_customer_events():
    return (
        spark.readStream
             .format("cloudFiles")
             .option("cloudFiles.format", "json")
             .load("s3://causeway-bronze/salesforce/account/")
    )

# Silver streaming table — cleaned, deduplicated, with quality gates
@dlt.table(
    name="silver_customers",
    comment="One row per customer, SCD-1 from Salesforce CDC.",
    tblproperties={"quality": "silver"},
)
@dlt.expect_or_drop("valid_customer_id", "customer_id IS NOT NULL")
@dlt.expect("reasonable_ltv", "ltv_usd BETWEEN 0 AND 1000000")
@dlt.expect_or_fail("valid_email", "email LIKE '%@%'")
def silver_customers():
    return (
        dlt.read_stream("bronze_customer_events")
           .select(
               col("customer_id").cast("bigint").alias("customer_id"),
               trim(col("email")).alias("email"),
               col("ltv_usd").cast("decimal(12, 2)"),
               col("_change_timestamp").alias("updated_at"),
           )
    )

Three things are worth noticing:

2. Configure the pipeline

Declare the pipeline in your DAB:

# resources/pipelines/silver_customers.pipeline.yml
resources:
  pipelines:
    silver_customers:
      name: silver_customers_${bundle.target}
      catalog: ${var.catalog}
      target: silver
      serverless: true
      photon: true
      continuous: false
      channel: CURRENT
      development: ${bundle.target == "dev"}
      configuration:
        source_path: s3://causeway-${bundle.target}-bronze/salesforce/
      libraries:
        - file:
            path: ../../pipelines/silver_customers.py

Three settings determine behavior.

continuous: false (triggered)

Triggered pipelines run once, process new data, stop. Use this for batch and near-batch workloads. Orchestrate with Lakeflow Jobs or Airflow.

Continuous pipelines run indefinitely, processing data as it arrives. Use when latency matters and the source is truly streaming. Expect to pay for compute continuously, even when no data flows.

Warning

Avoid continuous unless you genuinely need streaming semantics. Many teams reach for continuous: true because "this is a streaming pipeline", then pay for idle compute 80% of the time. Triggered on a 5-minute schedule is often the right answer.

development: true (dev) vs development: false (prod)

SettingDev (true)Prod (false)
Cluster reuseYes; faster iterationNo; fresh cluster each run
Expectations that failLogged onlyStop the pipeline
Table retention on code changeMay be recreatedPersistent
Use caseIterating on codeStable execution

The snippet above uses ${bundle.target == "dev"} so dev targets get dev behavior and staging/prod do not.

channel: CURRENT

CURRENT gets stable features. PREVIEW opts you into the next release early.

Causeway production pipelines run CURRENT. Preview channel is fine in dev for evaluating upcoming features; do not promote PREVIEW into production.

3. Expectations

Three severities:

# WARN — log violations, keep all rows
@dlt.expect("valid_email", "email IS NOT NULL AND email LIKE '%@%'")

# DROP — remove violating rows from the output
@dlt.expect_or_drop("valid_amount", "amount > 0")

# FAIL — halt the pipeline on any violation
@dlt.expect_or_fail("valid_customer_id", "customer_id IS NOT NULL")

Or grouped:

@dlt.expect_all({
    "valid_email": "email IS NOT NULL",
    "valid_amount": "amount > 0",
    "valid_date": "transaction_date <= CURRENT_DATE()",
})

@dlt.expect_all_or_drop({
    "not_null_id": "id IS NOT NULL",
    "positive_quantity": "quantity > 0",
})

Rules of thumb:

Danger

expect_or_fail halts the pipeline and blocks every downstream table until resolved. Use it only for conditions that represent "this data cannot possibly be right". A typical signature: unique keys should be unique; primary IDs should not be null; currencies should be in the expected set.

4. CDC with AUTO CDC

For CDC sources, use the AUTO CDC flow (which replaced APPLY CHANGES INTO in 2026):

import dlt

@dlt.table(name="silver_customers", tblproperties={"quality": "silver"})
def silver_customers():
    return dlt.read("bronze_customer_events")

# Apply CDC changes from bronze → silver
dlt.create_auto_cdc_flow(
    target="silver_customers",
    source="bronze_customer_events",
    keys=["customer_id"],
    sequence_by="_change_timestamp",
    stored_as_scd_type=1,   # SCD-1: overwrite; use 2 for SCD-2 history
    apply_as_deletes="_change_type = 'DELETE'",
    apply_as_truncates="_change_type = 'TRUNCATE'",
)

Three configuration choices:

5. Deploy and run

databricks bundle deploy --target dev
databricks bundle run silver_customers --target dev

The run URL lands you in the Databricks UI, where you see:

6. Monitor via the event log

LDP writes a structured event log you can query:

-- All errors in the last 24 hours
SELECT timestamp, event_type, message, details
FROM event_log(TABLE(prod.silver.silver_customers))
WHERE level = 'ERROR'
  AND timestamp > CURRENT_TIMESTAMP() - INTERVAL 24 HOURS
ORDER BY timestamp DESC;

-- Expectation violations trend
SELECT
  DATE(timestamp) AS date,
  SUM(details:flow_progress:data_quality:dropped_records) AS dropped
FROM event_log(TABLE(prod.silver.silver_customers))
WHERE event_type = 'flow_progress'
GROUP BY DATE(timestamp)
ORDER BY date DESC;

Ship these queries to your observability stack (Grafana, Elementary, or the Causeway-internal dashboard). Rising drop counts are the early signal that a source changed.

7. Full refresh

Sometimes you need to blow a pipeline away and rebuild:

databricks bundle run silver_customers --target prod --full-refresh

Warning

Full refresh rewrites every table in the pipeline. On large sources this is expensive. Consider --full-refresh --refresh-tables silver_customers to target only the affected table, or replace_where-style targeted backfill on the specific slice.

Common mistakes

SymptomRoot cause
Pipeline runs succeed but tables are emptydlt.read_stream returns no rows on the first run because the source has not produced new data since the checkpoint. Wait, or use dlt.read for batch mode.
StreamingQueryException: Error reading checkpointCheckpoint corrupted (rare). Full-refresh the affected streaming table.
Pipeline runs in continuous mode despite configcontinuous: false at pipeline level but the notebook sets trigger: continuous. Pipeline config wins; check both.
Expectations silently ignoredDevelopment mode with development: true logs failures instead of enforcing them. Expected during dev; fatal in prod.
AUTO CDC produces duplicate rowssequence_by column is not monotonic in the source. Use a real CDC sequence, not current_timestamp().

See also